Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested types in ORC writer #3696

Merged
merged 13 commits into from
Oct 13, 2021
6 changes: 3 additions & 3 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -17213,9 +17213,9 @@ dates or timestamps, or for a lack of type coercion support.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
25 changes: 21 additions & 4 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,29 @@
from datetime import date, datetime, timezone
from data_gen import *
from marks import *
from orc_test import orc_basic_map_gens as orc_write_basic_map_gens
from pyspark.sql.types import *

orc_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))],
orc_write_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]

orc_write_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_write_basic_gens)])

# Some array/struct gens, but not all because of nesting
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind expanding on this comment a bit, why "not all because of nesting"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, I will update it in a following PR since this PR should be merged as soon as possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment.

orc_write_struct_gens_sample = [orc_write_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', orc_write_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

orc_write_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_write_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

orc_write_gens_list = [orc_write_basic_gens,
orc_write_struct_gens_sample,
orc_write_array_gens_sample,
orc_write_basic_map_gens,
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import java.util.Random
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{
ColumnView, CompressionType, DType, HostBufferConsumer, HostMemoryBuffer,
ParquetColumnWriterOptions, ParquetWriterOptions, Table, TableWriter
}
import ai.rapids.cudf.ParquetColumnWriterOptions.{listBuilder, structBuilder, NestedBuilder}
import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -144,8 +141,8 @@ private class ColumnIndex() {
object ParquetDumper extends Arm {
val COMPRESS_TYPE = CompressionType.SNAPPY

def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
table: Table): T = {

val cIndex = new ColumnIndex
Expand All @@ -159,8 +156,8 @@ object ParquetDumper extends Arm {
}

private def parquetWriterOptionsFromColumnView[T <: NestedBuilder[_, _],
V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
cv: ColumnView,
cIndex: ColumnIndex,
toClose: ArrayBuffer[ColumnView]): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,8 @@ object GpuOverrides extends Logging {
(OrcFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.DECIMAL_64 +
TypeSig.STRUCT + TypeSig.MAP).nested(),
cudfWrite = TypeSig.commonCudfTypes,
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.ARRAY +
Copy link
Collaborator

@abellina abellina Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I get confused with TypeSig. So we can read nested maps, but not write them? Or did I misunderstand? Also is there a follow on issue to handle the same types that the reader supports.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the truth. We can read nested map but can not write nested map. Which is limited by the cudf native orc writer.
I will file an issue to cudf first. Once cudf supported it, we can update the TypeSig here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I removed the map support due to the test failures only happened in pre-merge builds.
We have the issue #3784 to track this.

TypeSig.STRUCT).nested() + TypeSig.MAP,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf._
import ai.rapids.cudf.ParquetColumnWriterOptions._
import com.nvidia.spark.RebaseHelper
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext}
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
Expand All @@ -33,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.rapids.ColumnarWriteTaskStatsTracker
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DateType, Decimal, DecimalType, MapType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuParquetFileFormat {
Expand Down Expand Up @@ -117,68 +116,6 @@ object GpuParquetFileFormat {
}
}

def parquetWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
dataType: DataType,
name: String,
writeInt96: Boolean,
nullable: Boolean): T = {
dataType match {
case dt: DecimalType =>
builder.withDecimalColumn(name, dt.precision, nullable)
case TimestampType =>
builder.withTimestampColumn(name, writeInt96, nullable)
case s: StructType =>
builder.withStructColumn(
parquetWriterOptionsFromSchema(
// we are setting this to nullable, in case the parent is a Map's key and wants to
// set this to false
structBuilder(name, nullable),
s,
writeInt96).build())
case a: ArrayType =>
builder.withListColumn(
parquetWriterOptionsFromField(
// we are setting this to nullable, in case the parent is a Map's key and wants to
// set this to false
listBuilder(name, nullable),
a.elementType,
name,
writeInt96,
true).build())
case m: MapType =>
builder.withMapColumn(
mapColumn(name,
parquetWriterOptionsFromField(
ParquetWriterOptions.builder(),
m.keyType,
"key",
writeInt96,
false).build().getChildColumnOptions()(0),
parquetWriterOptionsFromField(
ParquetWriterOptions.builder(),
m.valueType,
"value",
writeInt96,
nullable).build().getChildColumnOptions()(0)))
case _ =>
builder.withColumns(nullable, name)
}
builder.asInstanceOf[T]
}

def parquetWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
schema: StructType,
writeInt96: Boolean): T = {
// TODO once https://github.com/rapidsai/cudf/issues/7654 is fixed go back to actually
// setting if the output is nullable or not everywhere we have hard-coded nullable=true
schema.foreach(field =>
parquetWriterOptionsFromField(builder, field.dataType, field.name, writeInt96, true)
)
builder.asInstanceOf[T]
}

def parseCompressionType(compressionType: String): Option[CompressionType] = {
compressionType match {
case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE)
Expand Down Expand Up @@ -401,8 +338,8 @@ class GpuParquetWriter(

override val tableWriter: TableWriter = {
val writeContext = new ParquetWriteSupport().init(conf)
val builder = GpuParquetFileFormat
.parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
val builder = SchemaUtils
.writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType)
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.Optional
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions

import ai.rapids.cudf.{ColumnView, Table}
import ai.rapids.cudf._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good way to import all from cudf.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good suggestion, but this is done by IDE, if you prefer, i can change it back.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. Since it's a breaking change, let's merge it as soon as possible.

import ai.rapids.cudf.ColumnWriterOptions._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find ColumnWriterOptions in cudf side. Is there a pending PR?

Copy link
Collaborator Author

@firestarman firestarman Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it depends on rapidsai/cudf#9334. I will merge it once this PR gets approvals.

import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import org.apache.orc.TypeDescription

Expand Down Expand Up @@ -208,4 +209,70 @@ object SchemaUtils extends Arm {
case _ => col
}
}

private def writerOptionsFromField[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: NestedBuilder[T, V],
dataType: DataType,
name: String,
nullable: Boolean,
writeInt96: Boolean): T = {
dataType match {
case dt: DecimalType =>
builder.withDecimalColumn(name, dt.precision, nullable)
case TimestampType =>
builder.withTimestampColumn(name, writeInt96, nullable)
case s: StructType =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous version of this code for struct and array defaulted both to nullable (and had a comment that is missing in your change).

// we are setting this to nullable, in case the parent is a Map's key and wants to
// set this to false

Why don't these columns need to be set nullable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had discussion above for Zara's comments.
I moved similar comments to the beginning of method writerOptionsFromSchema.

Here setting to nullable with the comment before because nullale was hard-coded to true due to the issue rapidsai/cudf#7654.
But this issue has been circumvented by PR rapidsai/cudf#9061, so we can still keep it being nullable, but the comment is no longer needed.

Copy link
Collaborator Author

@firestarman firestarman Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the structBuilder and listBuilder are still using nullable for parent columns, while for child columns, we should use containsNull for array and valueContainsNull for map to tell whether the children are nullable.

builder.withStructColumn(
writerOptionsFromSchema(
structBuilder(name, nullable),
s,
writeInt96).build())
case a: ArrayType =>
builder.withListColumn(
writerOptionsFromField(
listBuilder(name, nullable),
a.elementType,
name,
a.containsNull,
writeInt96).build())
case m: MapType =>
// It is ok to use `StructBuilder` here for key and value, since either
// `OrcWriterOptions.Builder` or `ParquetWriterOptions.Builder` is actually an
// `AbstractStructBuilder`, and here only handles the common column metadata things.
builder.withMapColumn(
mapColumn(name,
writerOptionsFromField(
structBuilder(name, nullable),
m.keyType,
"key",
nullable = false,
writeInt96).build().getChildColumnOptions()(0),
writerOptionsFromField(
structBuilder(name, nullable),
m.valueType,
"value",
m.valueContainsNull,
writeInt96).build().getChildColumnOptions()(0)))
case _ =>
builder.withColumns(nullable, name)
}
builder.asInstanceOf[T]
}

/**
* Build writer options from schema for both ORC and Parquet writers.
*
* (There is an open issue "https://github.com/rapidsai/cudf/issues/7654" for Parquet writer,
* but it is circumvented by https://github.com/rapidsai/cudf/pull/9061, so the nullable can
* go back to the actual setting, instead of the hard-coded nullable=true before.)
*/
def writerOptionsFromSchema[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: NestedBuilder[T, V],
schema: StructType,
writeInt96: Boolean = false): T = {
schema.foreach(field =>
writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96)
)
builder.asInstanceOf[T]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcUtils}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

object GpuOrcFileFormat extends Logging {
// The classname used when Spark is configured to use the Hive implementation for ORC.
Expand Down Expand Up @@ -161,18 +161,9 @@ class GpuOrcWriter(path: String,
extends ColumnarOutputWriter(path, context, dataSchema, "ORC") {

override val tableWriter: TableWriter = {
val builder= ORCWriterOptions.builder()
val builder = SchemaUtils
.writerOptionsFromSchema(ORCWriterOptions.builder(), dataSchema)
.withCompressionType(CompressionType.valueOf(OrcConf.COMPRESS.getString(conf)))

dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
} else {
builder.withNotNullableColumnNames(entry.name)
}
})

val options = builder.build()
Table.writeORCChunked(options, this)
Table.writeORCChunked(builder.build(), this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetColumnWriterOptions, ParquetWriterOptions, Scalar}
import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetWriterOptions, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuListUtils, GpuLiteral, GpuMapUtils, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down