Skip to content

Commit

Permalink
Support nested types in ORC writer (#3696)
Browse files Browse the repository at this point in the history
* Support structs/lists in ORC writer

Signed-off-by: Firestarman <[email protected]>

* Comment update

Signed-off-by: Firestarman <[email protected]>

* Address the comment

Signed-off-by: Firestarman <[email protected]>

* ORC and Parquet share the same method to build options.

Signed-off-by: Firestarman <[email protected]>

* Add map support in orc writer

Nested map is not supported yet.

Signed-off-by: Firestarman <[email protected]>

* Address comments

Signed-off-by: Firestarman <[email protected]>

* Fix a build error

Signed-off-by: Firestarman <[email protected]>

* New map gens for orc write

Try to fix a build error in premerge builds which running tests in parallel.

Signed-off-by: Firestarman <[email protected]>

* Remove map support.

Because map tests failed in premerge builds where tests run in parallel.

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Oct 13, 2021
1 parent 738c95f commit 7d73931
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 99 deletions.
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -17213,9 +17213,9 @@ dates or timestamps, or for a lack of type coercion support.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
22 changes: 18 additions & 4 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@
from marks import *
from pyspark.sql.types import *

orc_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))],
orc_write_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]

orc_write_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_write_basic_gens)])

orc_write_struct_gens_sample = [orc_write_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', orc_write_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

orc_write_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_write_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

orc_write_gens_list = [orc_write_basic_gens,
orc_write_struct_gens_sample,
orc_write_array_gens_sample,
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
table: Table,
schema: StructType): ParquetBufferConsumer = {
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
val opts = GpuParquetFileFormat
.parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false)
val opts = SchemaUtils
.writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false)
.withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build()
withResource(Table.writeParquetChunked(opts, buffer)) { writer =>
writer.write(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import java.util.Random
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{
ColumnView, CompressionType, DType, HostBufferConsumer, HostMemoryBuffer,
ParquetColumnWriterOptions, ParquetWriterOptions, Table, TableWriter
}
import ai.rapids.cudf.ParquetColumnWriterOptions.{listBuilder, structBuilder, NestedBuilder}
import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -144,8 +141,8 @@ private class ColumnIndex() {
object ParquetDumper extends Arm {
val COMPRESS_TYPE = CompressionType.SNAPPY

def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
table: Table): T = {

val cIndex = new ColumnIndex
Expand All @@ -159,8 +156,8 @@ object ParquetDumper extends Arm {
}

private def parquetWriterOptionsFromColumnView[T <: NestedBuilder[_, _],
V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
cv: ColumnView,
cIndex: ColumnIndex,
toClose: ArrayBuffer[ColumnView]): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,8 @@ object GpuOverrides extends Logging {
(OrcFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.DECIMAL_64 +
TypeSig.STRUCT + TypeSig.MAP).nested(),
cudfWrite = TypeSig.commonCudfTypes,
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.ARRAY +
TypeSig.STRUCT).nested(),
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf._
import ai.rapids.cudf.ParquetColumnWriterOptions._
import com.nvidia.spark.RebaseHelper
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext}
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
Expand All @@ -33,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.rapids.ColumnarWriteTaskStatsTracker
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DateType, Decimal, DecimalType, MapType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuParquetFileFormat {
Expand Down Expand Up @@ -117,68 +116,6 @@ object GpuParquetFileFormat {
}
}

def parquetWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
dataType: DataType,
name: String,
writeInt96: Boolean,
nullable: Boolean): T = {
dataType match {
case dt: DecimalType =>
builder.withDecimalColumn(name, dt.precision, nullable)
case TimestampType =>
builder.withTimestampColumn(name, writeInt96, nullable)
case s: StructType =>
builder.withStructColumn(
parquetWriterOptionsFromSchema(
// we are setting this to nullable, in case the parent is a Map's key and wants to
// set this to false
structBuilder(name, nullable),
s,
writeInt96).build())
case a: ArrayType =>
builder.withListColumn(
parquetWriterOptionsFromField(
// we are setting this to nullable, in case the parent is a Map's key and wants to
// set this to false
listBuilder(name, nullable),
a.elementType,
name,
writeInt96,
true).build())
case m: MapType =>
builder.withMapColumn(
mapColumn(name,
parquetWriterOptionsFromField(
ParquetWriterOptions.builder(),
m.keyType,
"key",
writeInt96,
false).build().getChildColumnOptions()(0),
parquetWriterOptionsFromField(
ParquetWriterOptions.builder(),
m.valueType,
"value",
writeInt96,
nullable).build().getChildColumnOptions()(0)))
case _ =>
builder.withColumns(nullable, name)
}
builder.asInstanceOf[T]
}

def parquetWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
schema: StructType,
writeInt96: Boolean): T = {
// TODO once https://github.com/rapidsai/cudf/issues/7654 is fixed go back to actually
// setting if the output is nullable or not everywhere we have hard-coded nullable=true
schema.foreach(field =>
parquetWriterOptionsFromField(builder, field.dataType, field.name, writeInt96, true)
)
builder.asInstanceOf[T]
}

def parseCompressionType(compressionType: String): Option[CompressionType] = {
compressionType match {
case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE)
Expand Down Expand Up @@ -401,8 +338,8 @@ class GpuParquetWriter(

override val tableWriter: TableWriter = {
val writeContext = new ParquetWriteSupport().init(conf)
val builder = GpuParquetFileFormat
.parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
val builder = SchemaUtils
.writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType)
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.Optional
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions

import ai.rapids.cudf.{ColumnView, Table}
import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import org.apache.orc.TypeDescription

Expand Down Expand Up @@ -208,4 +209,70 @@ object SchemaUtils extends Arm {
case _ => col
}
}

private def writerOptionsFromField[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: NestedBuilder[T, V],
dataType: DataType,
name: String,
nullable: Boolean,
writeInt96: Boolean): T = {
dataType match {
case dt: DecimalType =>
builder.withDecimalColumn(name, dt.precision, nullable)
case TimestampType =>
builder.withTimestampColumn(name, writeInt96, nullable)
case s: StructType =>
builder.withStructColumn(
writerOptionsFromSchema(
structBuilder(name, nullable),
s,
writeInt96).build())
case a: ArrayType =>
builder.withListColumn(
writerOptionsFromField(
listBuilder(name, nullable),
a.elementType,
name,
a.containsNull,
writeInt96).build())
case m: MapType =>
// It is ok to use `StructBuilder` here for key and value, since either
// `OrcWriterOptions.Builder` or `ParquetWriterOptions.Builder` is actually an
// `AbstractStructBuilder`, and here only handles the common column metadata things.
builder.withMapColumn(
mapColumn(name,
writerOptionsFromField(
structBuilder(name, nullable),
m.keyType,
"key",
nullable = false,
writeInt96).build().getChildColumnOptions()(0),
writerOptionsFromField(
structBuilder(name, nullable),
m.valueType,
"value",
m.valueContainsNull,
writeInt96).build().getChildColumnOptions()(0)))
case _ =>
builder.withColumns(nullable, name)
}
builder.asInstanceOf[T]
}

/**
* Build writer options from schema for both ORC and Parquet writers.
*
* (There is an open issue "https://github.com/rapidsai/cudf/issues/7654" for Parquet writer,
* but it is circumvented by https://github.com/rapidsai/cudf/pull/9061, so the nullable can
* go back to the actual setting, instead of the hard-coded nullable=true before.)
*/
def writerOptionsFromSchema[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: NestedBuilder[T, V],
schema: StructType,
writeInt96: Boolean = false): T = {
schema.foreach(field =>
writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96)
)
builder.asInstanceOf[T]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcUtils}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

object GpuOrcFileFormat extends Logging {
// The classname used when Spark is configured to use the Hive implementation for ORC.
Expand Down Expand Up @@ -161,18 +161,9 @@ class GpuOrcWriter(path: String,
extends ColumnarOutputWriter(path, context, dataSchema, "ORC") {

override val tableWriter: TableWriter = {
val builder= ORCWriterOptions.builder()
val builder = SchemaUtils
.writerOptionsFromSchema(ORCWriterOptions.builder(), dataSchema)
.withCompressionType(CompressionType.valueOf(OrcConf.COMPRESS.getString(conf)))

dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
} else {
builder.withNotNullableColumnNames(entry.name)
}
})

val options = builder.build()
Table.writeORCChunked(options, this)
Table.writeORCChunked(builder.build(), this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetColumnWriterOptions, ParquetWriterOptions, Scalar}
import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetWriterOptions, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuListUtils, GpuLiteral, GpuMapUtils, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down

0 comments on commit 7d73931

Please sign in to comment.