From 7d739312b564dde6a2373b0c808a6fd772b96881 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Wed, 13 Oct 2021 18:18:51 +0800 Subject: [PATCH] Support nested types in ORC writer (#3696) * Support structs/lists in ORC writer Signed-off-by: Firestarman * Comment update Signed-off-by: Firestarman * Address the comment Signed-off-by: Firestarman * ORC and Parquet share the same method to build options. Signed-off-by: Firestarman * Add map support in orc writer Nested map is not supported yet. Signed-off-by: Firestarman * Address comments Signed-off-by: Firestarman * Fix a build error Signed-off-by: Firestarman * New map gens for orc write Try to fix a build error in premerge builds which running tests in parallel. Signed-off-by: Firestarman * Remove map support. Because map tests failed in premerge builds where tests run in parallel. Signed-off-by: Firestarman --- docs/supported_ops.md | 4 +- .../src/main/python/orc_write_test.py | 22 ++++-- .../v2/ParquetCachedBatchSerializer.scala | 4 +- .../com/nvidia/spark/rapids/DumpUtils.scala | 15 ++-- .../nvidia/spark/rapids/GpuOverrides.scala | 3 +- .../spark/rapids/GpuParquetFileFormat.scala | 69 +------------------ .../com/nvidia/spark/rapids/SchemaUtils.scala | 69 ++++++++++++++++++- .../spark/sql/rapids/GpuOrcFileFormat.scala | 17 ++--- .../sql/rapids/collectionOperations.scala | 2 +- 9 files changed, 106 insertions(+), 99 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index e37d6ebe693..a9e6566c632 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -17213,9 +17213,9 @@ dates or timestamps, or for a lack of type coercion support. NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types DECIMAL, BINARY, MAP, UDT
NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types DECIMAL, BINARY, MAP, UDT
NS diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index c155ba955dc..7da779c4143 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -20,10 +20,24 @@ from marks import * from pyspark.sql.types import * -orc_write_gens_list = [ - [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))], +orc_write_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), + TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))] + +orc_write_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_write_basic_gens)]) + +orc_write_struct_gens_sample = [orc_write_basic_struct_gen, + StructGen([['child0', byte_gen], ['child1', orc_write_basic_struct_gen]]), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] + +orc_write_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_write_basic_gens] + [ + ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), + ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10), + ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] + +orc_write_gens_list = [orc_write_basic_gens, + orc_write_struct_gens_sample, + orc_write_array_gens_sample, pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')), pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))] diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala index 9d6c92a264a..613c7fcf290 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala @@ -435,8 +435,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm { table: Table, schema: StructType): ParquetBufferConsumer = { val buffer = new ParquetBufferConsumer(table.getRowCount.toInt) - val opts = GpuParquetFileFormat - .parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) + val opts = SchemaUtils + .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() withResource(Table.writeParquetChunked(opts, buffer)) { writer => writer.write(table) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index 0f676203be8..33c29a6673d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -21,11 +21,8 @@ import java.util.Random import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ - ColumnView, CompressionType, DType, HostBufferConsumer, HostMemoryBuffer, - ParquetColumnWriterOptions, ParquetWriterOptions, Table, TableWriter -} -import ai.rapids.cudf.ParquetColumnWriterOptions.{listBuilder, structBuilder, NestedBuilder} +import ai.rapids.cudf._ +import ai.rapids.cudf.ColumnWriterOptions._ import org.apache.spark.internal.Logging import org.apache.spark.sql.vectorized.ColumnarBatch @@ -144,8 +141,8 @@ private class ColumnIndex() { object ParquetDumper extends Arm { val COMPRESS_TYPE = CompressionType.SNAPPY - def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions]( - builder: ParquetColumnWriterOptions.NestedBuilder[T, V], + def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ColumnWriterOptions]( + builder: ColumnWriterOptions.NestedBuilder[T, V], table: Table): T = { val cIndex = new ColumnIndex @@ -159,8 +156,8 @@ object ParquetDumper extends Arm { } private def parquetWriterOptionsFromColumnView[T <: NestedBuilder[_, _], - V <: ParquetColumnWriterOptions]( - builder: ParquetColumnWriterOptions.NestedBuilder[T, V], + V <: ColumnWriterOptions]( + builder: ColumnWriterOptions.NestedBuilder[T, V], cv: ColumnView, cIndex: ColumnIndex, toClose: ArrayBuffer[ColumnView]): T = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 8fc7c86e98b..8dd0c323f5b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -852,7 +852,8 @@ object GpuOverrides extends Logging { (OrcFormatType, FileFormatChecks( cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.DECIMAL_64 + TypeSig.STRUCT + TypeSig.MAP).nested(), - cudfWrite = TypeSig.commonCudfTypes, + cudfWrite = (TypeSig.commonCudfTypes + TypeSig.ARRAY + + TypeSig.STRUCT).nested(), sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested()))) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 89a305fa1d4..d679af86ea7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids import ai.rapids.cudf._ -import ai.rapids.cudf.ParquetColumnWriterOptions._ import com.nvidia.spark.RebaseHelper import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext} import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} @@ -33,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.rapids.ColumnarWriteTaskStatsTracker import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DateType, Decimal, DecimalType, MapType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch object GpuParquetFileFormat { @@ -117,68 +116,6 @@ object GpuParquetFileFormat { } } - def parquetWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions]( - builder: ParquetColumnWriterOptions.NestedBuilder[T, V], - dataType: DataType, - name: String, - writeInt96: Boolean, - nullable: Boolean): T = { - dataType match { - case dt: DecimalType => - builder.withDecimalColumn(name, dt.precision, nullable) - case TimestampType => - builder.withTimestampColumn(name, writeInt96, nullable) - case s: StructType => - builder.withStructColumn( - parquetWriterOptionsFromSchema( - // we are setting this to nullable, in case the parent is a Map's key and wants to - // set this to false - structBuilder(name, nullable), - s, - writeInt96).build()) - case a: ArrayType => - builder.withListColumn( - parquetWriterOptionsFromField( - // we are setting this to nullable, in case the parent is a Map's key and wants to - // set this to false - listBuilder(name, nullable), - a.elementType, - name, - writeInt96, - true).build()) - case m: MapType => - builder.withMapColumn( - mapColumn(name, - parquetWriterOptionsFromField( - ParquetWriterOptions.builder(), - m.keyType, - "key", - writeInt96, - false).build().getChildColumnOptions()(0), - parquetWriterOptionsFromField( - ParquetWriterOptions.builder(), - m.valueType, - "value", - writeInt96, - nullable).build().getChildColumnOptions()(0))) - case _ => - builder.withColumns(nullable, name) - } - builder.asInstanceOf[T] - } - - def parquetWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions]( - builder: ParquetColumnWriterOptions.NestedBuilder[T, V], - schema: StructType, - writeInt96: Boolean): T = { - // TODO once https://github.com/rapidsai/cudf/issues/7654 is fixed go back to actually - // setting if the output is nullable or not everywhere we have hard-coded nullable=true - schema.foreach(field => - parquetWriterOptionsFromField(builder, field.dataType, field.name, writeInt96, true) - ) - builder.asInstanceOf[T] - } - def parseCompressionType(compressionType: String): Option[CompressionType] = { compressionType match { case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE) @@ -401,8 +338,8 @@ class GpuParquetWriter( override val tableWriter: TableWriter = { val writeContext = new ParquetWriteSupport().init(conf) - val builder = GpuParquetFileFormat - .parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, + val builder = SchemaUtils + .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType) .withMetadata(writeContext.getExtraMetaData) .withCompressionType(compressionType) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala index 1531ff440b7..e535d1d9f9a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala @@ -21,7 +21,8 @@ import java.util.Optional import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions -import ai.rapids.cudf.{ColumnView, Table} +import ai.rapids.cudf._ +import ai.rapids.cudf.ColumnWriterOptions._ import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq import org.apache.orc.TypeDescription @@ -208,4 +209,70 @@ object SchemaUtils extends Arm { case _ => col } } + + private def writerOptionsFromField[T <: NestedBuilder[_, _], V <: ColumnWriterOptions]( + builder: NestedBuilder[T, V], + dataType: DataType, + name: String, + nullable: Boolean, + writeInt96: Boolean): T = { + dataType match { + case dt: DecimalType => + builder.withDecimalColumn(name, dt.precision, nullable) + case TimestampType => + builder.withTimestampColumn(name, writeInt96, nullable) + case s: StructType => + builder.withStructColumn( + writerOptionsFromSchema( + structBuilder(name, nullable), + s, + writeInt96).build()) + case a: ArrayType => + builder.withListColumn( + writerOptionsFromField( + listBuilder(name, nullable), + a.elementType, + name, + a.containsNull, + writeInt96).build()) + case m: MapType => + // It is ok to use `StructBuilder` here for key and value, since either + // `OrcWriterOptions.Builder` or `ParquetWriterOptions.Builder` is actually an + // `AbstractStructBuilder`, and here only handles the common column metadata things. + builder.withMapColumn( + mapColumn(name, + writerOptionsFromField( + structBuilder(name, nullable), + m.keyType, + "key", + nullable = false, + writeInt96).build().getChildColumnOptions()(0), + writerOptionsFromField( + structBuilder(name, nullable), + m.valueType, + "value", + m.valueContainsNull, + writeInt96).build().getChildColumnOptions()(0))) + case _ => + builder.withColumns(nullable, name) + } + builder.asInstanceOf[T] + } + + /** + * Build writer options from schema for both ORC and Parquet writers. + * + * (There is an open issue "https://github.com/rapidsai/cudf/issues/7654" for Parquet writer, + * but it is circumvented by https://github.com/rapidsai/cudf/pull/9061, so the nullable can + * go back to the actual setting, instead of the hard-coded nullable=true before.) + */ + def writerOptionsFromSchema[T <: NestedBuilder[_, _], V <: ColumnWriterOptions]( + builder: NestedBuilder[T, V], + schema: StructType, + writeInt96: Boolean = false): T = { + schema.foreach(field => + writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96) + ) + builder.asInstanceOf[T] + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index c4f0f95b43b..3baa6ca2386 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcUtils} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ object GpuOrcFileFormat extends Logging { // The classname used when Spark is configured to use the Hive implementation for ORC. @@ -161,18 +161,9 @@ class GpuOrcWriter(path: String, extends ColumnarOutputWriter(path, context, dataSchema, "ORC") { override val tableWriter: TableWriter = { - val builder= ORCWriterOptions.builder() + val builder = SchemaUtils + .writerOptionsFromSchema(ORCWriterOptions.builder(), dataSchema) .withCompressionType(CompressionType.valueOf(OrcConf.COMPRESS.getString(conf))) - - dataSchema.foreach(entry => { - if (entry.nullable) { - builder.withColumnNames(entry.name) - } else { - builder.withNotNullableColumnNames(entry.name) - } - }) - - val options = builder.build() - Table.writeORCChunked(options, this) + Table.writeORCChunked(builder.build(), this) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala index a1a51944f1a..beec3b4c6d4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf -import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetColumnWriterOptions, ParquetWriterOptions, Scalar} +import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetWriterOptions, Scalar} import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuListUtils, GpuLiteral, GpuMapUtils, GpuScalar, GpuUnaryExpression} import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn import com.nvidia.spark.rapids.RapidsPluginImplicits._