Skip to content

Commit

Permalink
Extend Parquet reader system to add an optional row-index column
Browse files Browse the repository at this point in the history
This is neccessary to allow predicate pushdown alongside row-level
deletes on Parquet data (such as allowed by the Iceberg v2 draft).

Previously, the row-group filter made it impossible for a downstream
connector to correctly identify rows after passing the Parquet reader
a predicate.
  • Loading branch information
jirassimok authored and findepi committed Apr 12, 2021
1 parent bd66e4a commit 0dbb2b5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ParquetReader

private final Optional<String> fileCreatedBy;
private final List<BlockMetaData> blocks;
private final Optional<List<Long>> firstRowsOfBlocks;
private final List<PrimitiveColumnIO> columns;
private final ParquetDataSource dataSource;
private final DateTimeZone timeZone;
Expand All @@ -79,6 +80,13 @@ public class ParquetReader
private int currentRowGroup = -1;
private BlockMetaData currentBlockMetadata;
private long currentGroupRowCount;
/**
* Index in the Parquet file of the first row of the current group
*/
private Optional<Long> firstRowIndexInGroup = Optional.empty();
/**
* Index in the current group of the next row
*/
private long nextRowInGroup;
private int batchSize;
private int nextBatchSize = INITIAL_BATCH_SIZE;
Expand All @@ -95,6 +103,7 @@ public ParquetReader(
Optional<String> fileCreatedBy,
MessageColumnIO messageColumnIO,
List<BlockMetaData> blocks,
Optional<List<Long>> firstRowsOfBlocks,
ParquetDataSource dataSource,
DateTimeZone timeZone,
AggregatedMemoryContext systemMemoryContext,
Expand All @@ -104,6 +113,7 @@ public ParquetReader(
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
this.columns = requireNonNull(messageColumnIO, "messageColumnIO is null").getLeaves();
this.blocks = requireNonNull(blocks, "blocks is null");
this.firstRowsOfBlocks = requireNonNull(firstRowsOfBlocks, "firstRowsOfBlocks is null");
this.dataSource = requireNonNull(dataSource, "dataSource is null");
this.timeZone = requireNonNull(timeZone, "timeZone is null");
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
Expand All @@ -112,6 +122,10 @@ public ParquetReader(
this.columnReaders = new PrimitiveColumnReader[columns.size()];
this.maxBytesPerCell = new long[columns.size()];

firstRowsOfBlocks.ifPresent(firstRows -> {
checkArgument(blocks.size() == firstRows.size(), "elements of firstRowsOfBlocks must correspond to blocks");
});

Map<ChunkKey, DiskRange> ranges = new HashMap<>();
for (int rowGroup = 0; rowGroup < blocks.size(); rowGroup++) {
BlockMetaData metadata = blocks.get(rowGroup);
Expand All @@ -135,6 +149,15 @@ public void close()
dataSource.close();
}

/**
* Get the global row index of the first row in the last batch.
*/
public long lastBatchStartRow()
{
long baseIndex = firstRowIndexInGroup.orElseThrow(() -> new IllegalStateException("row index unavailable"));
return baseIndex + nextRowInGroup - batchSize;
}

public int nextBatch()
{
if (nextRowInGroup >= currentGroupRowCount && !advanceToNextRowGroup()) {
Expand Down Expand Up @@ -162,7 +185,7 @@ private boolean advanceToNextRowGroup()
return false;
}
currentBlockMetadata = blocks.get(currentRowGroup);

firstRowIndexInGroup = firstRowsOfBlocks.map(firstRows -> firstRows.get(currentRowGroup));
nextRowInGroup = 0L;
currentGroupRowCount = currentBlockMetadata.getRowCount();
initializeColumnReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive.parquet;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import io.trino.parquet.Field;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.reader.ParquetReader;
Expand All @@ -22,6 +23,7 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.block.LazyBlockLoader;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.Type;
Expand All @@ -31,10 +33,12 @@
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static java.lang.String.format;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;

public class ParquetPageSource
Expand All @@ -43,15 +47,53 @@ public class ParquetPageSource
private final ParquetReader parquetReader;
private final List<Type> types;
private final List<Optional<Field>> fields;
/**
* Indicates whether the column at each index should be populated with the
* indices of its rows
*/
private final List<Boolean> rowIndexLocations;

private int batchId;
private boolean closed;

public ParquetPageSource(ParquetReader parquetReader, List<Type> types, List<Optional<Field>> fields)
{
this(parquetReader, types, nCopies(types.size(), false), fields);
}

/**
* @param types Column types
* @param rowIndexLocations Whether each column should be populated with the indices of its rows
* @param fields List of field descriptions. Empty optionals will result in columns populated with {@code NULL}
*/
public ParquetPageSource(
ParquetReader parquetReader,
List<Type> types,
List<Boolean> rowIndexLocations,
List<Optional<Field>> fields)
{
this.parquetReader = requireNonNull(parquetReader, "parquetReader is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
this.rowIndexLocations = requireNonNull(rowIndexLocations, "rowIndexLocations is null");
this.fields = ImmutableList.copyOf(requireNonNull(fields, "fields is null"));

// TODO: Instead of checking that the three list arguments go together correctly,
// we should do something like the ORC reader's ColumnAdatpation, using
// subclasses that contain only the necessary information for each column.
checkArgument(
types.size() == rowIndexLocations.size() && types.size() == fields.size(),
"types, rowIndexLocations, and fields must correspond one-to-one-to-one");
Streams.forEachPair(
rowIndexLocations.stream(),
fields.stream(),
(isIndexColumn, field) -> checkArgument(
!(isIndexColumn && field.isPresent()),
"Field info for row index column must be empty Optional"));
}

private boolean isIndexColumn(int column)
{
return rowIndexLocations.get(column);
}

@Override
Expand Down Expand Up @@ -91,11 +133,16 @@ public Page getNextPage()
}

Block[] blocks = new Block[fields.size()];
for (int fieldId = 0; fieldId < blocks.length; fieldId++) {
Type type = types.get(fieldId);
blocks[fieldId] = fields.get(fieldId)
.<Block>map(field -> new LazyBlock(batchSize, new ParquetBlockLoader(field)))
.orElseGet(() -> RunLengthEncodedBlock.create(type, null, batchSize));
for (int column = 0; column < blocks.length; column++) {
if (isIndexColumn(column)) {
blocks[column] = getRowIndexColumn(parquetReader.lastBatchStartRow(), batchSize);
}
else {
Type type = types.get(column);
blocks[column] = fields.get(column)
.<Block>map(field -> new LazyBlock(batchSize, new ParquetBlockLoader(field)))
.orElseGet(() -> RunLengthEncodedBlock.create(type, null, batchSize));
}
}
return new Page(batchSize, blocks);
}
Expand Down Expand Up @@ -177,4 +224,13 @@ public final Block load()
return block;
}
}

private static Block getRowIndexColumn(long baseIndex, int size)
{
long[] rowIndices = new long[size];
for (int position = 0; position < size; position++) {
rowIndices[position] = baseIndex + position;
}
return new LongArrayBlock(size, Optional.empty(), rowIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HivePageSourceFactory;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.ReaderColumns;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.acid.AcidTransaction;
Expand Down Expand Up @@ -86,6 +87,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableList;
Expand All @@ -94,6 +96,20 @@
public class ParquetPageSourceFactory
implements HivePageSourceFactory
{
/**
* If this object is passed as one of the columns for {@code createPageSource},
* it will be populated as an additional column containing the index of each
* row read.
*/
public static final HiveColumnHandle PARQUET_ROW_INDEX_COLUMN = new HiveColumnHandle(
"$parquet$row_index",
-1, // no real column index
HiveType.HIVE_LONG,
BIGINT,
Optional.empty(),
HiveColumnHandle.ColumnType.SYNTHESIZED,
Optional.empty());

private static final Set<String> PARQUET_SERDE_CLASS_NAMES = ImmutableSet.<String>builder()
.add("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
.add("parquet.hive.serde.ParquetHiveSerDe")
Expand Down Expand Up @@ -211,18 +227,23 @@ public static ReaderPageSource createPageSource(

Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);

long nextStart = 0;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
ImmutableList.Builder<Long> blockStarts = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (start <= firstDataPage && firstDataPage < start + length
&& predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
blocks.add(block);
blockStarts.add(nextStart);
}
nextStart += block.getRowCount();
}
parquetReader = new ParquetReader(
Optional.ofNullable(fileMetaData.getCreatedBy()),
messageColumn,
blocks.build(),
Optional.of(blockStarts.build()),
dataSource,
timeZone,
newSimpleAggregatedMemoryContext(),
Expand Down Expand Up @@ -261,21 +282,32 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
.orElse(columns);

for (HiveColumnHandle column : baseColumns) {
checkArgument(column.getColumnType() == REGULAR, "column type must be REGULAR: %s", column);
checkArgument(column == PARQUET_ROW_INDEX_COLUMN || column.getColumnType() == REGULAR, "column type must be REGULAR: %s", column);
}

ImmutableList.Builder<Type> trinoTypes = ImmutableList.builder();
ImmutableList.Builder<Optional<Field>> internalFields = ImmutableList.builder();
ImmutableList.Builder<Boolean> rowIndexColumns = ImmutableList.builder();
for (HiveColumnHandle column : baseColumns) {
trinoTypes.add(column.getBaseType());
internalFields.add(Optional.ofNullable(getParquetType(column, fileSchema, useColumnNames))
.flatMap(field -> {
String columnName = useColumnNames ? column.getBaseColumnName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName();
return constructField(column.getBaseType(), lookupColumnByName(messageColumn, columnName));
}));
rowIndexColumns.add(column == PARQUET_ROW_INDEX_COLUMN);
if (column == PARQUET_ROW_INDEX_COLUMN) {
internalFields.add(Optional.empty());
}
else {
internalFields.add(Optional.ofNullable(getParquetType(column, fileSchema, useColumnNames))
.flatMap(field -> {
String columnName = useColumnNames ? column.getBaseColumnName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName();
return constructField(column.getBaseType(), lookupColumnByName(messageColumn, columnName));
}));
}
}

ConnectorPageSource parquetPageSource = new ParquetPageSource(parquetReader, trinoTypes.build(), internalFields.build());
ConnectorPageSource parquetPageSource = new ParquetPageSource(
parquetReader,
trinoTypes.build(),
rowIndexColumns.build(),
internalFields.build());
return new ReaderPageSource(parquetPageSource, readerProjections);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ private static ConnectorPageSource createParquetPageSource(
Optional.ofNullable(fileMetaData.getCreatedBy()),
messageColumnIO,
blocks,
Optional.empty(),
dataSource,
UTC,
systemMemoryContext,
Expand Down

0 comments on commit 0dbb2b5

Please sign in to comment.