Skip to content

Commit

Permalink
Avoid re-reading parquet footer for iceberg stats in writer
Browse files Browse the repository at this point in the history
Currently we're reading parquet file footer back just after
writing it to file system for computing iceberg stats.
Since Trino parquet writer already has the file footer, there
is no need to perform an extra IO to get the footer again.
  • Loading branch information
raunaqmorarka committed Sep 21, 2023
1 parent 9a55d5d commit 5e4bee9
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.parquet.writer.ColumnWriter.BufferData;
import io.trino.spi.Page;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.CompressionCodec;
Expand Down Expand Up @@ -80,20 +81,20 @@ public class ParquetWriter
private final OutputStreamSliceOutput outputStream;
private final ParquetWriterOptions writerOption;
private final MessageType messageType;
private final String createdBy;
private final int chunkMaxLogicalBytes;
private final Map<List<String>, Type> primitiveTypes;
private final CompressionCodec compressionCodec;
private final Optional<DateTimeZone> parquetTimeZone;

private final ImmutableList.Builder<RowGroup> rowGroupBuilder = ImmutableList.builder();
private final FileFooter fileFooter;
private final Optional<ParquetWriteValidationBuilder> validationBuilder;

private List<ColumnWriter> columnWriters;
private int rows;
private long bufferedBytes;
private boolean closed;
private boolean writeHeader;
@Nullable
private FileMetaData fileMetaData;

public static final Slice MAGIC = wrappedBuffer("PAR1".getBytes(US_ASCII));

Expand All @@ -114,7 +115,8 @@ public ParquetWriter(
this.writerOption = requireNonNull(writerOption, "writerOption is null");
this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null");
this.parquetTimeZone = requireNonNull(parquetTimeZone, "parquetTimeZone is null");
this.createdBy = formatCreatedBy(requireNonNull(trinoVersion, "trinoVersion is null"));
String createdBy = formatCreatedBy(requireNonNull(trinoVersion, "trinoVersion is null"));
this.fileFooter = new FileFooter(messageType, createdBy, parquetTimeZone);

recordValidation(validation -> validation.setTimeZone(parquetTimeZone.map(DateTimeZone::getID)));
recordValidation(validation -> validation.setColumns(messageType.getColumns()));
Expand Down Expand Up @@ -229,6 +231,12 @@ public void validate(ParquetDataSource input)
}
}

public FileMetaData getFileMetaData()
{
checkState(closed, "fileMetaData is available only after writer is closed");
return requireNonNull(fileMetaData, "fileMetaData is null");
}

private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetadata parquetMetadata, ParquetWriteValidation writeValidation)
throws IOException
{
Expand Down Expand Up @@ -330,9 +338,9 @@ private void writeFooter()
throws IOException
{
checkState(closed);
List<RowGroup> rowGroups = rowGroupBuilder.build();
Slice footer = getFooter(rowGroups, messageType);
recordValidation(validation -> validation.setRowGroups(rowGroups));
fileMetaData = fileFooter.createFileMetadata();
Slice footer = serializeFooter(fileMetaData);
recordValidation(validation -> validation.setRowGroups(fileMetaData.getRow_groups()));
createDataOutput(footer).writeData(outputStream);

Slice footerSize = Slices.allocate(SIZE_OF_INT);
Expand All @@ -342,31 +350,20 @@ private void writeFooter()
createDataOutput(MAGIC).writeData(outputStream);
}

Slice getFooter(List<RowGroup> rowGroups, MessageType messageType)
throws IOException
{
FileMetaData fileMetaData = new FileMetaData();
fileMetaData.setVersion(1);
fileMetaData.setCreated_by(createdBy);
fileMetaData.setSchema(MessageTypeConverter.toParquetSchema(messageType));
// Added based on org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport
parquetTimeZone.ifPresent(dateTimeZone -> fileMetaData.setKey_value_metadata(
ImmutableList.of(new KeyValue("writer.time.zone").setValue(dateTimeZone.getID()))));
long totalRows = rowGroups.stream().mapToLong(RowGroup::getNum_rows).sum();
fileMetaData.setNum_rows(totalRows);
fileMetaData.setRow_groups(ImmutableList.copyOf(rowGroups));

DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(40);
Util.writeFileMetaData(fileMetaData, dynamicSliceOutput);
return dynamicSliceOutput.slice();
}

private void updateRowGroups(List<ColumnMetaData> columnMetaData)
{
long totalCompressedBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_compressed_size).sum();
long totalBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_uncompressed_size).sum();
ImmutableList<org.apache.parquet.format.ColumnChunk> columnChunks = columnMetaData.stream().map(ParquetWriter::toColumnChunk).collect(toImmutableList());
rowGroupBuilder.add(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes));
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes));
}

private static Slice serializeFooter(FileMetaData fileMetaData)
throws IOException
{
DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(40);
Util.writeFileMetaData(fileMetaData, dynamicSliceOutput);
return dynamicSliceOutput.slice();
}

private static org.apache.parquet.format.ColumnChunk toColumnChunk(ColumnMetaData metaData)
Expand Down Expand Up @@ -395,4 +392,45 @@ private void initColumnWriters()

this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodec, parquetTimeZone);
}

private static class FileFooter
{
private final MessageType messageType;
private final String createdBy;
private final Optional<DateTimeZone> parquetTimeZone;

@Nullable
private ImmutableList.Builder<RowGroup> rowGroupBuilder = ImmutableList.builder();

private FileFooter(MessageType messageType, String createdBy, Optional<DateTimeZone> parquetTimeZone)
{
this.messageType = messageType;
this.createdBy = createdBy;
this.parquetTimeZone = parquetTimeZone;
}

public void addRowGroup(RowGroup rowGroup)
{
checkState(rowGroupBuilder != null, "rowGroupBuilder is null");
rowGroupBuilder.add(rowGroup);
}

public FileMetaData createFileMetadata()
{
checkState(rowGroupBuilder != null, "rowGroupBuilder is null");
List<RowGroup> rowGroups = rowGroupBuilder.build();
rowGroupBuilder = null;
long totalRows = rowGroups.stream().mapToLong(RowGroup::getNum_rows).sum();
FileMetaData fileMetaData = new FileMetaData(
1,
MessageTypeConverter.toParquetSchema(messageType),
totalRows,
ImmutableList.copyOf(rowGroups));
fileMetaData.setCreated_by(createdBy);
// Added based on org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport
parquetTimeZone.ifPresent(dateTimeZone -> fileMetaData.setKey_value_metadata(
ImmutableList.of(new KeyValue("writer.time.zone").setValue(dateTimeZone.getID()))));
return fileMetaData;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.Type;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.schema.MessageType;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -198,4 +199,9 @@ public String toString()
.add("writer", parquetWriter)
.toString();
}

public FileMetaData getFileMetadata()
{
return parquetWriter.getFileMetaData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ private IcebergFileWriter createParquetWriter(
parquetWriterOptions,
IntStream.range(0, fileColumnNames.size()).toArray(),
getCompressionCodec(session).getParquetCompressionCodec(),
nodeVersion.toString(),
fileSystem);
nodeVersion.toString());
}
catch (IOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,38 @@
*/
package io.trino.plugin.iceberg;

import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.type.Type;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.io.InputFile;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.parquet.ParquetUtil.fileMetrics;
import static org.apache.iceberg.parquet.ParquetUtil.footerMetrics;

public final class IcebergParquetFileWriter
implements IcebergFileWriter
{
private final MetricsConfig metricsConfig;
private final InputFile inputFile;
private final ParquetFileWriter parquetFileWriter;
private final Location location;

public IcebergParquetFileWriter(
MetricsConfig metricsConfig,
Expand All @@ -53,8 +57,7 @@ public IcebergParquetFileWriter(
ParquetWriterOptions parquetWriterOptions,
int[] fileInputColumnIndexes,
CompressionCodec compressionCodec,
String trinoVersion,
TrinoFileSystem fileSystem)
String trinoVersion)
throws IOException
{
this.parquetFileWriter = new ParquetFileWriter(
Expand All @@ -70,14 +73,21 @@ public IcebergParquetFileWriter(
trinoVersion,
Optional.empty(),
Optional.empty());
this.location = outputFile.location();
this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null");
this.inputFile = new ForwardingInputFile(fileSystem.newInputFile(outputFile.location()));
}

@Override
public Metrics getMetrics()
{
return fileMetrics(inputFile, metricsConfig);
ParquetMetadata parquetMetadata;
try {
parquetMetadata = new ParquetMetadataConverter().fromParquetMetadata(parquetFileWriter.getFileMetadata());
}
catch (IOException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error creating metadata for Parquet file %s", location), e);
}
return footerMetrics(parquetMetadata, Stream.empty(), metricsConfig);
}

@Override
Expand Down

0 comments on commit 5e4bee9

Please sign in to comment.