Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
backport recent commits to 1.5 branch (#1187)
Browse files Browse the repository at this point in the history
* [NSE-1170] Set correct row number in batch scan w/ partition columns (#1172)

* [NSE-1171] Throw RuntimeException when reading duplicate fields in case-insensitive mode (#1173)

* throw exception if one more columns matched in case insensitive mode

* add schema check in arrow v2

* bump h2/pgsql version (#1176)

* bump h2/pgsql version

Signed-off-by: Yuan Zhou <[email protected]>

* ignore one failed test

Signed-off-by: Yuan Zhou <[email protected]>

Signed-off-by: Yuan Zhou <[email protected]>

* [NSE-956] allow to write parquet with compression (#1014)

This patch adds support for writing parquet with compression

df.coalesce(1).write.format("arrow").option("parquet.compression","zstd").save(path)

Signed-off-by: Yuan Zhou [email protected]

* [NSE-1161] Support read-write parquet conversion to read-write arrow (#1162)

* add ArrowConvertExtension

* do not convert parquet fileformat while writing to partitioned/bucketed/sorted output

* fix cache failed

* care about write codec

* disable convertor extension by default

* add some comments

* remove wrong compress type check (#1178)

Since the compresssion has been supported in #1014 . The extra compression check in ArrowConvertorExtension can be remove now.

* fix to use right arrow branch (#1179)


fix to use right arrow branch
Signed-off-by: Yuan Zhou <[email protected]>

* [NSE-1171] Support merge parquet schema and read missing schema (#1175)

* Support merge parquet schema and read missing schema

* fix error

* optimize null vectors

* optimize code

* optimize code

* change code

* add schema merge suite tests

* add test for struct type

* to use 1.5 branch arrow

Signed-off-by: Yuan Zhou <[email protected]>

Signed-off-by: Yuan Zhou <[email protected]>
Signed-off-by: Yuan Zhou [email protected]
Co-authored-by: Jacky Lee <[email protected]>
  • Loading branch information
zhouyuan and jackylee-ch authored Dec 14, 2022
1 parent 68a5298 commit 9639322
Show file tree
Hide file tree
Showing 14 changed files with 465 additions and 160 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.5 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2
sudo make install
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.5 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -97,7 +97,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.5 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -142,7 +142,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.5 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.arrow.vector.types.pojo.Schema

import org.apache.spark.internal.Logging

class ArrowWriteQueue(schema: Schema, fileFormat: FileFormat, outputFileURI: String)
class ArrowWriteQueue(schema: Schema, fileFormat: FileFormat, compressCodec: String, outputFileURI: String)
extends AutoCloseable with Logging {
private val scanner = new ScannerImpl(schema)
private val writeException = new AtomicReference[Throwable]
Expand All @@ -52,7 +52,7 @@ class ArrowWriteQueue(schema: Schema, fileFormat: FileFormat, outputFileURI: Str
val fileName = matcher.group(2)

try {
DatasetFileWriter.write(scanner, fileFormat, dirURI, Array(), 1, fileName)
DatasetFileWriter.write(scanner, fileFormat, compressCodec, dirURI, Array(), 1, fileName)
} catch {
case e: Throwable =>
writeException.set(e)
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap-1.5 $ARROW_SOURCE_DIR
pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.spark.sql

import java.util.Locale

import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

class ArrowConvertorExtension extends (SparkSessionExtensions => Unit) {
def apply(e: SparkSessionExtensions): Unit = {
e.injectPostHocResolutionRule(session => ArrowConvertorRule(session))
}
}

case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write datasource path
// TODO: support writing with partitioned/bucketed/sorted column
case c: InsertIntoHadoopFsRelationCommand
if c.fileFormat.isInstanceOf[ParquetFileFormat] &&
c.partitionColumns.isEmpty && c.bucketSpec.isEmpty =>
c.copy(fileFormat = new ArrowFileFormat)

// Read path
case l@ LogicalRelation(
r@ HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _, _, _) =>
l.copy(relation = r.copy(fileFormat = new ArrowFileFormat)(session))

// INSERT DIR
case c: InsertIntoDataSourceDirCommand if c.provider == "parquet" =>
c.copy(provider = "arrow")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,33 @@ import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowFilters, Arr
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.ArrowSQLConf._
import com.intel.oap.vectorized.ArrowWritableColumnVector
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkVectorUtils}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils.UnsafeItr
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializable {
class ArrowFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable {


override def isSplitable(sparkSession: SparkSession,
options: Map[String, String], path: Path): Boolean = {
options: Map[String, String], path: Path): Boolean = {
ArrowUtils.isOriginalFormatSplitable(
new ArrowOptions(new CaseInsensitiveStringMap(options.asJava).asScala.toMap))
}
Expand All @@ -61,16 +64,22 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
}

override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
convert(files, options)
}

override def prepareWrite(sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val arrowOptions = new ArrowOptions(new CaseInsensitiveStringMap(options.asJava).asScala.toMap)
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

val conf = ContextUtil.getConfiguration(job)
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
logInfo(s"write parquet with codec: ${parquetOptions.compressionCodecClassName}")

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
ArrowUtils.getFormat(arrowOptions) match {
Expand All @@ -84,13 +93,14 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
context: TaskAttemptContext): OutputWriter = {
val originPath = path
val writeQueue = new ArrowWriteQueue(ArrowUtils.toArrowSchema(dataSchema),
ArrowUtils.getFormat(arrowOptions), originPath)
ArrowUtils.getFormat(arrowOptions),
parquetOptions.compressionCodecClassName.toLowerCase(), originPath)

new OutputWriter {
override def write(row: InternalRow): Unit = {
val batch = row.asInstanceOf[FakeRow].batch
writeQueue.enqueue(SparkVectorUtils
.toArrowRecordBatch(batch))
.toArrowRecordBatch(batch))
}

override def close(): Unit = {
Expand Down Expand Up @@ -130,62 +140,79 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
// todo predicate validation / pushdown
val parquetFileFields = factory.inspect().getFields.asScala
val caseInsensitiveFieldMap = mutable.Map[String, String]()
val requiredFields = if (caseSensitive) {
new Schema(requiredSchema.map { field =>
parquetFileFields.find(_.getName.equals(field.name))
.getOrElse(ArrowUtils.toArrowField(field))
}.asJava)
} else {
new Schema(requiredSchema.map { readField =>
parquetFileFields.find(_.getName.equalsIgnoreCase(readField.name))
.map{ field =>
caseInsensitiveFieldMap += (readField.name -> field.getName)
field
}.getOrElse(ArrowUtils.toArrowField(readField))
}.asJava)
}
val dataset = factory.finish(requiredFields)
// TODO: support array/map/struct types in out-of-order schema reading.
val actualReadFields =
ArrowUtils.getRequestedField(requiredSchema, parquetFileFields, caseSensitive)

val compare = ArrowUtils.compareStringFunc(caseSensitive)
val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray
val actualReadSchema = new StructType(
actualReadFieldNames.map(f => requiredSchema.find(field => compare(f, field.name)).get))
val dataset = factory.finish(actualReadFields)

val hasMissingColumns = actualReadFields.getFields.size() != requiredSchema.size
val filter = if (enableFilterPushDown) {
ArrowFilters.translateFilters(filters, caseInsensitiveFieldMap.toMap)
val pushedFilters = if (hasMissingColumns) {
ArrowFilters.evaluateMissingFieldFilters(filters, actualReadFieldNames)
} else {
filters
}
if (pushedFilters == null) {
null
} else {
ArrowFilters.translateFilters(
pushedFilters, caseInsensitiveFieldMap.toMap)
}
} else {
org.apache.arrow.dataset.filter.Filter.EMPTY
}

val scanOptions = new ScanOptions(
requiredFields.getFields.asScala.map(f => f.getName).toArray,
filter,
batchSize)
val scanner = dataset.newScan(scanOptions)
if (filter == null) {
new Iterator[InternalRow] {
override def hasNext: Boolean = false
override def next(): InternalRow = null
}
} else {
val scanOptions = new ScanOptions(
actualReadFieldNames,
filter,
batchSize)
val scanner = dataset.newScan(scanOptions)

val taskList = scanner
val taskList = scanner
.scan()
.iterator()
.asScala
.toList
val itrList = taskList
.map(task => task.execute())

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => {
itrList.foreach(_.close())
taskList.foreach(_.close())
scanner.close()
dataset.close()
factory.close()
}))

val partitionVectors =
ArrowUtils.loadPartitionColumns(batchSize, partitionSchema, file.partitionValues)

SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]((_: TaskContext) => {
partitionVectors.foreach(_.close())
})

val itr = itrList
.toIterator
.flatMap(itr => itr.asScala)
.map(batch => ArrowUtils.loadBatch(batch, requiredSchema, partitionVectors))
new UnsafeItr(itr).asInstanceOf[Iterator[InternalRow]]
val itrList = taskList
.map(task => task.execute())

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => {
itrList.foreach(_.close())
taskList.foreach(_.close())
scanner.close()
dataset.close()
factory.close()
}))

val partitionVectors =
ArrowUtils.loadPartitionColumns(batchSize, partitionSchema, file.partitionValues)

val nullVectors = if (hasMissingColumns) {
val missingSchema =
new StructType(requiredSchema.filterNot(actualReadSchema.contains).toArray)
ArrowUtils.loadMissingColumns(batchSize, missingSchema)
} else {
Array.empty[ArrowWritableColumnVector]
}

val itr = itrList
.toIterator
.flatMap(itr => itr.asScala)
.map(batch => ArrowUtils.loadBatch(
batch, actualReadSchema, requiredSchema, partitionVectors, nullVectors))
new UnsafeItr(itr).asInstanceOf[Iterator[InternalRow]]
}
}
}

Expand All @@ -197,6 +224,10 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
}

override def shortName(): String = "arrow"

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[ArrowFileFormat]
}

object ArrowFileFormat {
Expand Down
Loading

0 comments on commit 9639322

Please sign in to comment.