Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert String to DecimalType without casting to FloatType [databricks] #4081

Merged
merged 9 commits into from
Nov 18, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar}

import org.apache.spark.sql.types.DecimalType
revans2 marked this conversation as resolved.
Show resolved Hide resolved

object FloatUtils extends Arm {

def nanToZero(cv: ColumnView): ColumnVector = {
Expand All @@ -40,8 +42,10 @@ object FloatUtils extends Arm {
def getNanScalar(dType: DType): Scalar = {
if (dType == DType.FLOAT64) {
Scalar.fromDouble(Double.NaN)
} else {
} else if (dType == DType.FLOAT32) {
Scalar.fromFloat(Float.NaN)
} else {
throw new IllegalArgumentException("NaNs are only supported for Float types")
}
}

Expand Down
119 changes: 79 additions & 40 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,6 @@ class CastExprMeta[INPUT <: CastBase](
YearParseUtil.tagParseStringAsDate(conf, this)
case (_: StringType, _: DateType) =>
YearParseUtil.tagParseStringAsDate(conf, this)
case (_: StringType, _: DecimalType) if !conf.isCastStringToDecimalEnabled =>
// FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019
willNotWorkOnGpu("Currently string to decimal type on the GPU might produce " +
"results which slightly differed from the correct results when the string represents " +
"any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For " +
"instance, the GPU returns 99999999999999987 when given the input string " +
"\"99999999999999999\". The cause of divergence is that we can not cast strings " +
"containing scientific notation to decimal directly. So, we have to cast strings " +
"to floats firstly. Then, cast floats to decimals. The first step may lead to " +
"precision loss. To enable this operation on the GPU, set " +
s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
case (structType: StructType, StringType) =>
structType.foreach { field =>
recursiveTagExprForGpuCheck(field.dataType, StringType, depth + 1)
Expand Down Expand Up @@ -162,7 +151,10 @@ object GpuCast extends Arm {

val INVALID_FLOAT_CAST_MSG: String = "At least one value is either null or is an invalid number"

def sanitizeStringToFloat(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = {
def sanitizeStringToFloat(
input: ColumnVector,
ansiEnabled: Boolean,
replaceInfinity: Boolean = true): ColumnVector = {

// This regex gets applied after the transformation to normalize use of Inf and is
// just strict enough to filter out known edge cases that would result in incorrect
Expand All @@ -189,38 +181,42 @@ object GpuCast extends Arm {
withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString =>
// filter out strings containing breaking whitespace
val withoutWhitespace = withResource(ColumnVector.fromStrings("\r", "\n")) {
verticalWhitespace =>
withResource(stripped.contains(verticalWhitespace)) {
_.ifElse(nullString, stripped)
}
verticalWhitespace =>
withResource(stripped.contains(verticalWhitespace)) {
_.ifElse(nullString, stripped)
}
}
// replace all possible versions of "Inf" and "Infinity" with "Inf"
val inf = withResource(withoutWhitespace) { _ =>
val postInfProcessing = if (replaceInfinity) {
// replace all possible versions of "Inf" and "Infinity" with "Inf"
val inf = withResource(withoutWhitespace) { _ =>
withoutWhitespace.stringReplaceWithBackrefs(
"(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf")
}
// replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf"
val infWithoutPlus = withResource(inf) { _ =>
withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search =>
withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace =>
inf.stringReplace(search, replace)
"(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf")
}
// replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf"
withResource(inf) { _ =>
withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search =>
withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace =>
inf.stringReplace(search, replace)
}
}
}
} else {
withoutWhitespace
}
// filter out any strings that are not valid floating point numbers according
// to the regex pattern
val floatOrNull = withResource(infWithoutPlus) { _ =>
withResource(infWithoutPlus.matchesRe(VALID_FLOAT_REGEX)) { isFloat =>
val floatOrNull = withResource(postInfProcessing) { _ =>
withResource(postInfProcessing.matchesRe(VALID_FLOAT_REGEX)) { isFloat =>
if (ansiEnabled) {
withResource(isFloat.all()) { allMatch =>
// Check that all non-null values are valid floats.
if (allMatch.isValid && !allMatch.getBoolean) {
throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG)
}
infWithoutPlus.incRefCount()
postInfProcessing.incRefCount()
}
} else {
isFloat.ifElse(infWithoutPlus, nullString)
isFloat.ifElse(postInfProcessing, nullString)
}
}
}
Expand Down Expand Up @@ -502,13 +498,8 @@ object GpuCast extends Arm {
}
}
case (StringType, dt: DecimalType) =>
// To apply HALF_UP rounding strategy during casting to decimal, we firstly cast
// string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding.
withResource(input.strip()) { trimmed =>
withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp =>
castFloatsToDecimal(fp, dt, ansiMode)
}
}
castStringToDecimal(input, ansiMode, dt)

case (ByteType | ShortType | IntegerType | LongType, dt: DecimalType) =>
castIntegralsToDecimal(input, dt, ansiMode)

Expand Down Expand Up @@ -777,17 +768,65 @@ object GpuCast extends Arm {
}
}

def castStringToDecimal(
input: ColumnView,
ansiEnabled: Boolean,
dt: DecimalType): ColumnVector = {
// 1. Sanitize strings to make sure all are floating points
// 2. Identify all fixed point values
// 3. Cast String to newDt (newDt = dt.scale + 1). Promote precision if needed. This step is
// required so we can round up if needed in the final step
// 4. Now cast to newDt to dt (Decimal to Decimal)
def getInterimDecimalPromoteIfNeeded(dt: DecimalType): DecimalType = {
if (dt.scale + 1 > dt.precision) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// promote if possible or throw
if (dt.precision == Decimal.MAX_LONG_DIGITS) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
//We don't support Decimal 128
revans2 marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("One or more values exceed the maximum supported " +
revans2 marked this conversation as resolved.
Show resolved Hide resolved
"Decimal precision while conversion")
}
DecimalType(dt.precision + 1, dt.scale + 1)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
DecimalType(dt.precision, dt.scale + 1)
}

withResource(input.strip()) { trimmed =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(GpuCast.sanitizeStringToFloat(trimmed, ansiEnabled, false)) { sanitized =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt)
val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt)
withResource(Scalar.fromNull(interimDt)) { nulls =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(sanitized.isFixedPoint(interimDt)) { isFixedPoints =>
if (ansiEnabled) {
withResource(isFixedPoints.all()) { allFixedPoints =>
if (allFixedPoints.isValid && !allFixedPoints.getBoolean) {
throw new ArithmeticException(s"One or more values cannot be " +
s"represented as Decimal(${dt.precision}, ${dt.scale})")
}
}
}
// intermediate step needed so we can make sure we can round up
withResource(input.castTo(interimDt)) { interimDecimals =>
withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals =>
// cast Decimal to the Decimal that's needed
castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled)
}
}
}
}
}
}
}

def castStringToFloats(
input: ColumnVector,
ansiEnabled: Boolean,
dType: DType): ColumnVector = {

// 1. convert the different infinities to "Inf"/"-Inf" which is the only variation cudf
// understands
// 2. identify the nans
// 3. identify the floats. "nan", "null" and letters are not considered floats
// 4. if ansi is enabled we want to throw and exception if the string is neither float nor nan
// 5. convert everything thats not floats to null
// 4. if ansi is enabled we want to throw an exception if the string is neither float nor nan
// 5. convert everything that's not floats to null
// 6. set the indices where we originally had nans to Float.NaN
//
// NOTE Limitation: "1.7976931348623159E308" and "-1.7976931348623159E308" are not considered
Expand Down Expand Up @@ -1146,7 +1185,7 @@ object GpuCast extends Arm {
// We rely on containerDecimal to perform preciser rounding. So, we have to take extra
// space cost of container into consideration when we run bound check.
val containerScaleBound = DType.DECIMAL64_MAX_PRECISION - (dt.scale + 1)
val bound = math.pow(10, (dt.precision - dt.scale) min containerScaleBound)
val bound = math.pow(10, (dt.precision - dt.scale).min(containerScaleBound))
if (ansiMode) {
assertValuesInRange(rounded,
minValue = Scalar.fromDouble(-bound),
Expand Down