From 716e73ad18af135b19fd1cb61b2e1d71b3d4519d Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 15 May 2024 23:35:05 -0700 Subject: [PATCH] address comments --- spark/v3.4/build.gradle | 2 + .../iceberg/spark/OrcBatchReadConf.java | 27 +++++++++++++ ...eadConf.java => ParquetBatchReadConf.java} | 28 +++---------- .../apache/iceberg/spark/SparkReadConf.java | 10 +---- .../iceberg/spark/source/BaseBatchReader.java | 18 +++++---- .../iceberg/spark/source/BatchDataReader.java | 16 +++++--- .../iceberg/spark/source/SparkBatch.java | 39 +++++++++++++++++-- .../source/SparkColumnarReaderFactory.java | 16 +++++--- 8 files changed, 104 insertions(+), 52 deletions(-) create mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java rename spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/{BatchReadConf.java => ParquetBatchReadConf.java} (57%) diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle index cc0d8bf8c3d2..19c891ceffb6 100644 --- a/spark/v3.4/build.gradle +++ b/spark/v3.4/build.gradle @@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') + annotationProcessor libs.immutables.value + compileOnly libs.immutables.value implementation project(':iceberg-common') implementation project(':iceberg-core') implementation project(':iceberg-data') diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java new file mode 100644 index 000000000000..d3b339d60e3f --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import org.immutables.value.Value; + +@Value.Immutable +public interface OrcBatchReadConf extends Serializable { + int batchSize(); +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java similarity index 57% rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java index 2a1393b0cc0c..442d728d4d69 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java @@ -19,29 +19,11 @@ package org.apache.iceberg.spark; import java.io.Serializable; +import org.immutables.value.Value; -public class BatchReadConf implements Serializable { +@Value.Immutable +public interface ParquetBatchReadConf extends Serializable { + int batchSize(); - private final int parquetBatchSize; - private final ParquetReaderType parquetReaderType; - private final int orcBatchSize; - - public BatchReadConf( - int parquetBatchSize, ParquetReaderType parquetReaderType, int orcBatchSize) { - this.parquetBatchSize = parquetBatchSize; - this.parquetReaderType = parquetReaderType; - this.orcBatchSize = orcBatchSize; - } - - public int parquetBatchSize() { - return parquetBatchSize; - } - - public int orcBatchSize() { - return orcBatchSize; - } - - public ParquetReaderType parquetReaderType() { - return parquetReaderType; - } + ParquetReaderType readerType(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index daa0ab7e21f3..8b30db3d031a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -26,7 +26,6 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Util; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; @@ -355,18 +354,11 @@ private boolean executorCacheLocalityEnabledInternal() { .parse(); } - private ParquetReaderType parquetReaderType() { + public ParquetReaderType parquetReaderType() { return confParser .enumConf(ParquetReaderType::valueOf) .sessionConf(SparkSQLProperties.PARQUET_READER_TYPE) .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT) .parse(); } - - public BatchReadConf batchReadConf() { - int parquetBatchSize = parquetBatchSize(); - int orcBatchSize = orcBatchSize(); - Preconditions.checkArgument(parquetBatchSize > 1 && orcBatchSize > 1, "Batch size must be > 1"); - return new BatchReadConf(parquetBatchSize, parquetReaderType(), orcBatchSize); - } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index a72f2cc88e46..780e1750a52e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -32,7 +32,8 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.BatchReadConf; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -40,7 +41,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { - private final BatchReadConf conf; + private final ParquetBatchReadConf parquetConf; + private final OrcBatchReadConf orcConf; BaseBatchReader( Table table, @@ -48,9 +50,11 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( @@ -89,7 +93,7 @@ private CloseableIterable newParquetIterable( .split(start, length) .createBatchedReaderFunc( fileSchema -> { - if (conf.parquetReaderType() == ParquetReaderType.COMET) { + if (parquetConf.readerType() == ParquetReaderType.COMET) { return VectorizedSparkParquetReaders.buildCometReader( requiredSchema, fileSchema, idToConstant, deleteFilter); } else { @@ -97,7 +101,7 @@ private CloseableIterable newParquetIterable( requiredSchema, fileSchema, idToConstant, deleteFilter); } }) - .recordsPerBatch(conf.parquetBatchSize()) + .recordsPerBatch(parquetConf.batchSize()) .filter(residual) .caseSensitive(caseSensitive()) // Spark eagerly consumes the batches. So the underlying memory allocated could be reused @@ -127,7 +131,7 @@ private CloseableIterable newOrcIterable( .createBatchedReaderFunc( fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(conf.orcBatchSize()) + .recordsPerBatch(orcConf.batchSize()) .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index d74e768ca1de..983e272d75b9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -28,7 +28,8 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.spark.BatchReadConf; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.SnapshotUtil; @@ -46,14 +47,18 @@ class BatchDataReader extends BaseBatchReader private final long numSplits; - BatchDataReader(SparkInputPartition partition, BatchReadConf conf) { + BatchDataReader( + SparkInputPartition partition, + ParquetBatchReadConf parquetBatchReadConf, + OrcBatchReadConf orcBatchReadConf) { this( partition.table(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), partition.isCaseSensitive(), - conf); + parquetBatchReadConf, + orcBatchReadConf); } BatchDataReader( @@ -62,8 +67,9 @@ class BatchDataReader extends BaseBatchReader Schema tableSchema, Schema expectedSchema, boolean caseSensitive, - BatchReadConf batchReadConf) { - super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, batchReadConf); + ParquetBatchReadConf parquetConf, + OrcBatchReadConf orcConf) { + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, parquetConf, orcConf); numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index e3b456fb7165..3f8bba5d9825 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -28,8 +28,14 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.spark.ImmutableOrcBatchReadConf; +import org.apache.iceberg.spark.ImmutableParquetBatchReadConf; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; +import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -113,17 +119,33 @@ private String[][] computePreferredLocations() { @Override public PartitionReaderFactory createReaderFactory() { - if (useParquetBatchReads()) { - return new SparkColumnarReaderFactory(readConf.batchReadConf()); + if (useCometBatchReads()) { + return new SparkColumnarReaderFactory( + parquetBatchReadConf(readConf.parquetBatchSize(), ParquetReaderType.COMET)); + + } else if (useParquetBatchReads()) { + return new SparkColumnarReaderFactory( + parquetBatchReadConf(readConf.parquetBatchSize(), ParquetReaderType.ICEBERG)); } else if (useOrcBatchReads()) { - return new SparkColumnarReaderFactory(readConf.batchReadConf()); + return new SparkColumnarReaderFactory(orcBatchReadConf(readConf.orcBatchSize())); } else { return new SparkRowReaderFactory(); } } + private ParquetBatchReadConf parquetBatchReadConf(int batchSize, ParquetReaderType readerType) { + return ImmutableParquetBatchReadConf.builder() + .batchSize(batchSize) + .readerType(readerType) + .build(); + } + + private OrcBatchReadConf orcBatchReadConf(int batchSize) { + return ImmutableOrcBatchReadConf.builder().batchSize(batchSize).build(); + } + // conditions for using Parquet batch reads: // - Parquet vectorization is enabled // - only primitives or metadata columns are projected @@ -152,6 +174,17 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } + private boolean useCometBatchReads() { + return readConf.parquetVectorizationEnabled() + && readConf.parquetReaderType() == ParquetReaderType.COMET + && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) + && taskGroups.stream().allMatch(this::supportsParquetBatchReads); + } + + private boolean supportsCometBatchReads(Types.NestedField field) { + return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID); + } + // conditions for using ORC batch reads: // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java index a5d1699a4886..876c0be533e0 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java @@ -20,7 +20,8 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.spark.BatchReadConf; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -28,10 +29,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; class SparkColumnarReaderFactory implements PartitionReaderFactory { - private final BatchReadConf conf; + private ParquetBatchReadConf parquetConf; + private OrcBatchReadConf orcConf; - SparkColumnarReaderFactory(BatchReadConf conf) { - this.conf = conf; + SparkColumnarReaderFactory(ParquetBatchReadConf conf) { + this.parquetConf = conf; + } + + SparkColumnarReaderFactory(OrcBatchReadConf conf) { + this.orcConf = conf; } @Override @@ -49,7 +55,7 @@ public PartitionReader createColumnarReader(InputPartition inputP SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new BatchDataReader(partition, conf); + return new BatchDataReader(partition, parquetConf, orcConf); } else { throw new UnsupportedOperationException( "Unsupported task group for columnar reads: " + partition.taskGroup());