Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaxin Gao committed May 16, 2024
1 parent ee398b5 commit 716e73a
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 52 deletions.
2 changes: 2 additions & 0 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-data')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
int batchSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,11 @@
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

public class BatchReadConf implements Serializable {
@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
int batchSize();

private final int parquetBatchSize;
private final ParquetReaderType parquetReaderType;
private final int orcBatchSize;

public BatchReadConf(
int parquetBatchSize, ParquetReaderType parquetReaderType, int orcBatchSize) {
this.parquetBatchSize = parquetBatchSize;
this.parquetReaderType = parquetReaderType;
this.orcBatchSize = orcBatchSize;
}

public int parquetBatchSize() {
return parquetBatchSize;
}

public int orcBatchSize() {
return orcBatchSize;
}

public ParquetReaderType parquetReaderType() {
return parquetReaderType;
}
ParquetReaderType readerType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -355,18 +354,11 @@ private boolean executorCacheLocalityEnabledInternal() {
.parse();
}

private ParquetReaderType parquetReaderType() {
public ParquetReaderType parquetReaderType() {
return confParser
.enumConf(ParquetReaderType::valueOf)
.sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
.defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
.parse();
}

public BatchReadConf batchReadConf() {
int parquetBatchSize = parquetBatchSize();
int orcBatchSize = orcBatchSize();
Preconditions.checkArgument(parquetBatchSize > 1 && orcBatchSize > 1, "Batch size must be > 1");
return new BatchReadConf(parquetBatchSize, parquetReaderType(), orcBatchSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,29 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.BatchReadConf;
import org.apache.iceberg.spark.OrcBatchReadConf;
import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.vectorized.ColumnarBatch;

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
private final BatchReadConf conf;
private final ParquetBatchReadConf parquetConf;
private final OrcBatchReadConf orcConf;

BaseBatchReader(
Table table,
ScanTaskGroup<T> taskGroup,
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
BatchReadConf conf) {
ParquetBatchReadConf parquetConf,
OrcBatchReadConf orcConf) {
super(table, taskGroup, tableSchema, expectedSchema, caseSensitive);
this.conf = conf;
this.parquetConf = parquetConf;
this.orcConf = orcConf;
}

protected CloseableIterable<ColumnarBatch> newBatchIterable(
Expand Down Expand Up @@ -89,15 +93,15 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
.split(start, length)
.createBatchedReaderFunc(
fileSchema -> {
if (conf.parquetReaderType() == ParquetReaderType.COMET) {
if (parquetConf.readerType() == ParquetReaderType.COMET) {
return VectorizedSparkParquetReaders.buildCometReader(
requiredSchema, fileSchema, idToConstant, deleteFilter);
} else {
return VectorizedSparkParquetReaders.buildReader(
requiredSchema, fileSchema, idToConstant, deleteFilter);
}
})
.recordsPerBatch(conf.parquetBatchSize())
.recordsPerBatch(parquetConf.batchSize())
.filter(residual)
.caseSensitive(caseSensitive())
// Spark eagerly consumes the batches. So the underlying memory allocated could be reused
Expand Down Expand Up @@ -127,7 +131,7 @@ private CloseableIterable<ColumnarBatch> newOrcIterable(
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant))
.recordsPerBatch(conf.orcBatchSize())
.recordsPerBatch(orcConf.batchSize())
.filter(residual)
.caseSensitive(caseSensitive())
.withNameMapping(nameMapping())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.BatchReadConf;
import org.apache.iceberg.spark.OrcBatchReadConf;
import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
import org.apache.iceberg.util.SnapshotUtil;
Expand All @@ -46,14 +47,18 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>

private final long numSplits;

BatchDataReader(SparkInputPartition partition, BatchReadConf conf) {
BatchDataReader(
SparkInputPartition partition,
ParquetBatchReadConf parquetBatchReadConf,
OrcBatchReadConf orcBatchReadConf) {
this(
partition.table(),
partition.taskGroup(),
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
partition.expectedSchema(),
partition.isCaseSensitive(),
conf);
parquetBatchReadConf,
orcBatchReadConf);
}

BatchDataReader(
Expand All @@ -62,8 +67,9 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
BatchReadConf batchReadConf) {
super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, batchReadConf);
ParquetBatchReadConf parquetConf,
OrcBatchReadConf orcConf) {
super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, parquetConf, orcConf);

numSplits = taskGroup.tasks().size();
LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.ImmutableOrcBatchReadConf;
import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
import org.apache.iceberg.spark.OrcBatchReadConf;
import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -113,17 +119,33 @@ private String[][] computePreferredLocations() {

@Override
public PartitionReaderFactory createReaderFactory() {
if (useParquetBatchReads()) {
return new SparkColumnarReaderFactory(readConf.batchReadConf());
if (useCometBatchReads()) {
return new SparkColumnarReaderFactory(
parquetBatchReadConf(readConf.parquetBatchSize(), ParquetReaderType.COMET));

} else if (useParquetBatchReads()) {
return new SparkColumnarReaderFactory(
parquetBatchReadConf(readConf.parquetBatchSize(), ParquetReaderType.ICEBERG));

} else if (useOrcBatchReads()) {
return new SparkColumnarReaderFactory(readConf.batchReadConf());
return new SparkColumnarReaderFactory(orcBatchReadConf(readConf.orcBatchSize()));

} else {
return new SparkRowReaderFactory();
}
}

private ParquetBatchReadConf parquetBatchReadConf(int batchSize, ParquetReaderType readerType) {
return ImmutableParquetBatchReadConf.builder()
.batchSize(batchSize)
.readerType(readerType)
.build();
}

private OrcBatchReadConf orcBatchReadConf(int batchSize) {
return ImmutableOrcBatchReadConf.builder().batchSize(batchSize).build();
}

// conditions for using Parquet batch reads:
// - Parquet vectorization is enabled
// - only primitives or metadata columns are projected
Expand Down Expand Up @@ -152,6 +174,17 @@ private boolean supportsParquetBatchReads(Types.NestedField field) {
return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId());
}

private boolean useCometBatchReads() {
return readConf.parquetVectorizationEnabled()
&& readConf.parquetReaderType() == ParquetReaderType.COMET
&& expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}

private boolean supportsCometBatchReads(Types.NestedField field) {
return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID);
}

// conditions for using ORC batch reads:
// - ORC vectorization is enabled
// - all tasks are of type FileScanTask and read only ORC files with no delete files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@

import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.BatchReadConf;
import org.apache.iceberg.spark.OrcBatchReadConf;
import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.vectorized.ColumnarBatch;

class SparkColumnarReaderFactory implements PartitionReaderFactory {
private final BatchReadConf conf;
private ParquetBatchReadConf parquetConf;
private OrcBatchReadConf orcConf;

SparkColumnarReaderFactory(BatchReadConf conf) {
this.conf = conf;
SparkColumnarReaderFactory(ParquetBatchReadConf conf) {
this.parquetConf = conf;
}

SparkColumnarReaderFactory(OrcBatchReadConf conf) {
this.orcConf = conf;
}

@Override
Expand All @@ -49,7 +55,7 @@ public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition inputP
SparkInputPartition partition = (SparkInputPartition) inputPartition;

if (partition.allTasksOfType(FileScanTask.class)) {
return new BatchDataReader(partition, conf);
return new BatchDataReader(partition, parquetConf, orcConf);
} else {
throw new UnsupportedOperationException(
"Unsupported task group for columnar reads: " + partition.taskGroup());
Expand Down

0 comments on commit 716e73a

Please sign in to comment.