Skip to content

Commit

Permalink
Hive Text parsing of invalid date strings should not cause exceptions.
Browse files Browse the repository at this point in the history
Fixes NVIDIA#7089. There were two problems:
  1. Strings between field delimiters should not be trimmed before casting to dates.
  2. Invalid date strings should not be causing exceptions. They should return null
     values, as is the convention in Hive's `LazySimpleSerDe`.
  • Loading branch information
mythrocks committed Dec 1, 2022
1 parent 313f363 commit 198efdf
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 13 deletions.
4 changes: 0 additions & 4 deletions integration_tests/src/main/python/hive_delimited_text_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ def read_impl(spark):
pytest.param('hive-delim-text/timestamp-err', timestamp_schema, {},
marks=pytest.mark.xfail(reason="GPU timestamp reads are more permissive than CPU. "
"See https://github.com/NVIDIA/spark-rapids/issues/7086")),
pytest.param('hive-delim-text/date-err', date_schema, {},
marks=pytest.mark.xfail(reason="GPU read trims date string whitespace, "
"and errors out on invalid dates."
"See https://github.com/NVIDIA/spark-rapids/issues/7089.")),
# Test that lines beginning with comments ('#') aren't skipped.
('hive-delim-text/comments', StructType([StructField("str", StringType()),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
2021-09-16
2021-09-16
2020-10-16
2021-09-16
2021-40-32
AnotherObviouslyInvalidDate
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
case dt: DecimalType =>
castStringToDecimal(table.getColumn(i), dt)
case DataTypes.DateType =>
castStringToDate(table.getColumn(i), DType.TIMESTAMP_DAYS, failOnInvalid = true)
castStringToDate(table.getColumn(i), DType.TIMESTAMP_DAYS)
case DataTypes.TimestampType =>
castStringToTimestamp(table.getColumn(i), timestampFormat,
DType.TIMESTAMP_MICROSECONDS)
Expand All @@ -358,6 +358,10 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
def dateFormat: String
def timestampFormat: String

def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
castStringToDate(input, dt, failOnInvalid = true)
}

def castStringToDate(input: ColumnVector, dt: DType, failOnInvalid: Boolean): ColumnVector = {
val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true)
withResource(input.strip()) { stripped =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package org.apache.spark.sql.hive.rapids

import ai.rapids.cudf.{ColumnVector, DType, Scalar, Schema, Table}
import com.nvidia.spark.RebaseHelper.withResource
import com.nvidia.spark.rapids.{ColumnarPartitionReaderWithPartitionValues, CSVPartitionReaderBase, GpuColumnVector, GpuExec, GpuMetric, HostStringBufferer, HostStringBuffererFactory, PartitionReaderIterator, PartitionReaderWithBytesRead, RapidsConf}
import com.nvidia.spark.rapids.{CSVPartitionReaderBase, ColumnarPartitionReaderWithPartitionValues, DateUtils, GpuColumnVector, GpuExec, GpuMetric, HostStringBufferer, HostStringBuffererFactory, PartitionReaderIterator, PartitionReaderWithBytesRead, RapidsConf}
import com.nvidia.spark.rapids.GpuMetric.{BUFFER_TIME, DEBUG_LEVEL, DESCRIPTION_BUFFER_TIME, DESCRIPTION_FILTER_TIME, DESCRIPTION_GPU_DECODE_TIME, DESCRIPTION_PEAK_DEVICE_MEMORY, ESSENTIAL_LEVEL, FILTER_TIME, GPU_DECODE_TIME, MODERATE_LEVEL, NUM_OUTPUT_ROWS, PEAK_DEVICE_MEMORY}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.shims.{ShimFilePartitionReaderFactory, ShimSparkPlan, SparkShimImpl}

import java.net.URI
import java.util.concurrent.TimeUnit.NANOSECONDS
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}

import scala.collection.JavaConverters._
import scala.collection.immutable.HashSet
import scala.collection.mutable

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -535,4 +536,23 @@ class GpuHiveDelimitedTextPartitionReader(conf: Configuration,
}
}
}

/**
* Override of [[com.nvidia.spark.rapids.GpuTextBasedPartitionReader.castStringToDate()]],
* to convert parsed string columns to Dates.
* Two key differences from the base implementation, to comply with Hive LazySimpleSerDe
* semantics:
* 1. The input strings are not trimmed of whitespace.
* 2. Invalid date strings do not cause exceptions.
*/
override def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true)
withResource(input.isTimestamp(cudfFormat)) { isDate =>
withResource(input.asTimestamp(dt, cudfFormat)) { asDate =>
withResource(Scalar.fromNull(dt)) { nullScalar =>
isDate.ifElse(asDate, nullScalar)
}
}
}
}
}

0 comments on commit 198efdf

Please sign in to comment.