From a9a6ccc5908c020658983ac279de2c388c4782cc Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 10 Nov 2021 20:30:57 -0800 Subject: [PATCH 1/7] string to decimal Signed-off-by: Raza Jafri --- .../com/nvidia/spark/rapids/FloatUtils.scala | 6 +- .../com/nvidia/spark/rapids/GpuCast.scala | 112 +++++++++++------- 2 files changed, 77 insertions(+), 41 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala index 4be623cc929..14e86a1078e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar} +import org.apache.spark.sql.types.DecimalType + object FloatUtils extends Arm { def nanToZero(cv: ColumnView): ColumnVector = { @@ -40,8 +42,10 @@ object FloatUtils extends Arm { def getNanScalar(dType: DType): Scalar = { if (dType == DType.FLOAT64) { Scalar.fromDouble(Double.NaN) - } else { + } else if (dType == DType.FLOAT32) { Scalar.fromFloat(Float.NaN) + } else { + throw new IllegalArgumentException("NaNs are only supported for Float types") } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index b5c7f6b89a8..35f450d72e2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -97,17 +97,6 @@ class CastExprMeta[INPUT <: CastBase]( YearParseUtil.tagParseStringAsDate(conf, this) case (_: StringType, _: DateType) => YearParseUtil.tagParseStringAsDate(conf, this) - case (_: StringType, _: DecimalType) if !conf.isCastStringToDecimalEnabled => - // FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019 - willNotWorkOnGpu("Currently string to decimal type on the GPU might produce " + - "results which slightly differed from the correct results when the string represents " + - "any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For " + - "instance, the GPU returns 99999999999999987 when given the input string " + - "\"99999999999999999\". The cause of divergence is that we can not cast strings " + - "containing scientific notation to decimal directly. So, we have to cast strings " + - "to floats firstly. Then, cast floats to decimals. The first step may lead to " + - "precision loss. To enable this operation on the GPU, set " + - s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.") case (structType: StructType, StringType) => structType.foreach { field => recursiveTagExprForGpuCheck(field.dataType, StringType, depth + 1) @@ -162,7 +151,10 @@ object GpuCast extends Arm { val INVALID_FLOAT_CAST_MSG: String = "At least one value is either null or is an invalid number" - def sanitizeStringToFloat(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { + def sanitizeStringToFloat( + input: ColumnVector, + ansiEnabled: Boolean, + replaceInfinity: Boolean = true): ColumnVector = { // This regex gets applied after the transformation to normalize use of Inf and is // just strict enough to filter out known edge cases that would result in incorrect @@ -189,38 +181,42 @@ object GpuCast extends Arm { withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString => // filter out strings containing breaking whitespace val withoutWhitespace = withResource(ColumnVector.fromStrings("\r", "\n")) { - verticalWhitespace => - withResource(stripped.contains(verticalWhitespace)) { - _.ifElse(nullString, stripped) - } + verticalWhitespace => + withResource(stripped.contains(verticalWhitespace)) { + _.ifElse(nullString, stripped) + } } - // replace all possible versions of "Inf" and "Infinity" with "Inf" - val inf = withResource(withoutWhitespace) { _ => + val postInfProcessing = if (replaceInfinity) { + // replace all possible versions of "Inf" and "Infinity" with "Inf" + val inf = withResource(withoutWhitespace) { _ => withoutWhitespace.stringReplaceWithBackrefs( - "(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf") - } - // replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf" - val infWithoutPlus = withResource(inf) { _ => - withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search => - withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace => - inf.stringReplace(search, replace) + "(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf") + } + // replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf" + withResource(inf) { _ => + withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search => + withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace => + inf.stringReplace(search, replace) + } } } + } else { + withoutWhitespace } // filter out any strings that are not valid floating point numbers according // to the regex pattern - val floatOrNull = withResource(infWithoutPlus) { _ => - withResource(infWithoutPlus.matchesRe(VALID_FLOAT_REGEX)) { isFloat => + val floatOrNull = withResource(postInfProcessing) { _ => + withResource(postInfProcessing.matchesRe(VALID_FLOAT_REGEX)) { isFloat => if (ansiEnabled) { withResource(isFloat.all()) { allMatch => // Check that all non-null values are valid floats. if (allMatch.isValid && !allMatch.getBoolean) { throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) } - infWithoutPlus.incRefCount() + postInfProcessing.incRefCount() } } else { - isFloat.ifElse(infWithoutPlus, nullString) + isFloat.ifElse(postInfProcessing, nullString) } } } @@ -502,13 +498,8 @@ object GpuCast extends Arm { } } case (StringType, dt: DecimalType) => - // To apply HALF_UP rounding strategy during casting to decimal, we firstly cast - // string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding. - withResource(input.strip()) { trimmed => - withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp => - castFloatsToDecimal(fp, dt, ansiMode) - } - } + castStringToDecimal(input, ansiMode, dt) + case (ByteType | ShortType | IntegerType | LongType, dt: DecimalType) => castIntegralsToDecimal(input, dt, ansiMode) @@ -777,17 +768,58 @@ object GpuCast extends Arm { } } + def castStringToDecimal( + input: ColumnView, + ansiEnabled: Boolean, + dt: DecimalType): ColumnVector = { + def getInterimDecimalPromoteIfNeeded(dt: DecimalType): DecimalType = { + if (dt.scale + 1 > dt.precision) { + // promote if possible or throw + if (dt.precision == Decimal.MAX_LONG_DIGITS) { + //We don't support Decimal 128 + throw new IllegalArgumentException("One or more values exceed the maximum supported " + + "Decimal precision while conversion") + } + DecimalType(dt.precision + 1, dt.scale + 1) + } + DecimalType(dt.precision, dt.scale + 1) + } + + withResource(input.strip()) { trimmed => + withResource(GpuCast.sanitizeStringToFloat(trimmed, ansiEnabled, false)) { sanitized => + val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) + val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) + withResource(Scalar.fromNull(interimDt)) { nulls => + withResource(sanitized.isFixedPoint(interimDt)) { isFixedPoints => + if (ansiEnabled) { + withResource(isFixedPoints.all()) { allFixedPoints => + if (allFixedPoints.isValid && !allFixedPoints.getBoolean) { + throw new ArithmeticException(s"One or more values cannot be " + + s"represented as Decimal(${dt.precision}, ${dt.scale})") + } + } + } + withResource(input.castTo(interimDt)) { interimDecimals => + withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => + castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) + } + } + } + } + } + } + } + def castStringToFloats( input: ColumnVector, ansiEnabled: Boolean, dType: DType): ColumnVector = { - // 1. convert the different infinities to "Inf"/"-Inf" which is the only variation cudf // understands // 2. identify the nans // 3. identify the floats. "nan", "null" and letters are not considered floats - // 4. if ansi is enabled we want to throw and exception if the string is neither float nor nan - // 5. convert everything thats not floats to null + // 4. if ansi is enabled we want to throw an exception if the string is neither float nor nan + // 5. convert everything that's not floats to null // 6. set the indices where we originally had nans to Float.NaN // // NOTE Limitation: "1.7976931348623159E308" and "-1.7976931348623159E308" are not considered @@ -1146,7 +1178,7 @@ object GpuCast extends Arm { // We rely on containerDecimal to perform preciser rounding. So, we have to take extra // space cost of container into consideration when we run bound check. val containerScaleBound = DType.DECIMAL64_MAX_PRECISION - (dt.scale + 1) - val bound = math.pow(10, (dt.precision - dt.scale) min containerScaleBound) + val bound = math.pow(10, (dt.precision - dt.scale).min(containerScaleBound)) if (ansiMode) { assertValuesInRange(rounded, minValue = Scalar.fromDouble(-bound), From 459c90702f8efe34de54a5005d9db1873f3e2afe Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 10 Nov 2021 20:44:00 -0800 Subject: [PATCH 2/7] added docs Signed-off-by: Raza Jafri --- .../src/main/scala/com/nvidia/spark/rapids/GpuCast.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 35f450d72e2..7357bb010d1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -772,6 +772,11 @@ object GpuCast extends Arm { input: ColumnView, ansiEnabled: Boolean, dt: DecimalType): ColumnVector = { + // 1. Sanitize strings to make sure all are floating points + // 2. Identify all fixed point values + // 3. Cast String to newDt (newDt = dt.scale + 1). Promote precision if needed. This step is + // required so we can round up if needed in the final step + // 4. Now cast to newDt to dt (Decimal to Decimal) def getInterimDecimalPromoteIfNeeded(dt: DecimalType): DecimalType = { if (dt.scale + 1 > dt.precision) { // promote if possible or throw @@ -799,8 +804,10 @@ object GpuCast extends Arm { } } } + // intermediate step needed so we can make sure we can round up withResource(input.castTo(interimDt)) { interimDecimals => withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => + // cast Decimal to the Decimal that's needed castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) } } From 4de3c8bec21abdc4d189d9b13f155227be78639b Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 15 Nov 2021 11:22:52 -0800 Subject: [PATCH 3/7] addressed review comments Signed-off-by: Raza Jafri --- .../com/nvidia/spark/rapids/GpuCast.scala | 121 ++++++++++++------ .../com/nvidia/spark/rapids/CastOpSuite.scala | 26 ++-- 2 files changed, 99 insertions(+), 48 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 7357bb010d1..38337583a92 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -79,6 +79,9 @@ class CastExprMeta[INPUT <: CastBase]( "converting floating point data types to strings and this can produce results that " + "differ from the default behavior in Spark. To enable this operation on the GPU, set" + s" ${RapidsConf.ENABLE_CAST_FLOAT_TO_STRING} to true.") + case (_: StringType, dt: DecimalType) if (dt.precision + 1 > Decimal.MAX_LONG_DIGITS) => + willNotWorkOnGpu(s"Converting to $dt will result in a 128-bit temporary type that is " + + s"not supported on the GPU") case (_: StringType, _: FloatType | _: DoubleType) if !conf.isCastStringToFloatEnabled => willNotWorkOnGpu("Currently hex values aren't supported on the GPU. Also note " + "that casting from string to float types on the GPU returns incorrect results when " + @@ -151,10 +154,60 @@ object GpuCast extends Arm { val INVALID_FLOAT_CAST_MSG: String = "At least one value is either null or is an invalid number" + def sanitizeStringToDecimal( + input: ColumnView, + ansiEnabled: Boolean): ColumnVector = { + + // This regex gets applied to filter out known edge cases that would result in incorrect + // values. We further filter out invalid values using the cuDF isFixedPoint method. + val VALID_DEC_REGEX = + "^" + // start of line + "[+\\-]?" + // optional + or - at start of string + "(" + + "(" + + "(" + + "([0-9]+)|" + // digits, OR + "([0-9]*\\.[0-9]+)|" + // decimal with optional leading and mandatory trailing, OR + "([0-9]+\\.[0-9]*)" + // decimal with mandatory leading and optional trailing + ")" + + "([eE][+\\-]?[0-9]+)?" + // exponent + ")" + + ")" + + "$" // end of line + + withResource(input.strip()) { stripped => + withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString => + // filter out strings containing breaking whitespace + val withoutWhitespace = withResource(ColumnVector.fromStrings("\r", "\n")) { + verticalWhitespace => + withResource(stripped.contains(verticalWhitespace)) { + _.ifElse(nullString, stripped) + } + } + // filter out any strings that are not valid fixed-point numbers according + // to the regex pattern + withResource(withoutWhitespace) { _ => + withResource(withoutWhitespace.matchesRe(VALID_DEC_REGEX)) { isFixedPoint => + if (ansiEnabled) { + withResource(isFixedPoint.all()) { allMatch => + // Check that all non-null values are valid floats. + if (allMatch.isValid && !allMatch.getBoolean) { + throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) + } + withoutWhitespace.incRefCount() + } + } else { + isFixedPoint.ifElse(withoutWhitespace, nullString) + } + } + } + } + } + } + def sanitizeStringToFloat( input: ColumnVector, - ansiEnabled: Boolean, - replaceInfinity: Boolean = true): ColumnVector = { + ansiEnabled: Boolean): ColumnVector = { // This regex gets applied after the transformation to normalize use of Inf and is // just strict enough to filter out known edge cases that would result in incorrect @@ -186,37 +239,33 @@ object GpuCast extends Arm { _.ifElse(nullString, stripped) } } - val postInfProcessing = if (replaceInfinity) { // replace all possible versions of "Inf" and "Infinity" with "Inf" val inf = withResource(withoutWhitespace) { _ => withoutWhitespace.stringReplaceWithBackrefs( "(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf") } // replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf" - withResource(inf) { _ => + val infWithoutPlus = withResource(inf) { _ => withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search => withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace => inf.stringReplace(search, replace) } } } - } else { - withoutWhitespace - } // filter out any strings that are not valid floating point numbers according // to the regex pattern - val floatOrNull = withResource(postInfProcessing) { _ => - withResource(postInfProcessing.matchesRe(VALID_FLOAT_REGEX)) { isFloat => + val floatOrNull = withResource(infWithoutPlus) { _ => + withResource(infWithoutPlus.matchesRe(VALID_FLOAT_REGEX)) { isFloat => if (ansiEnabled) { withResource(isFloat.all()) { allMatch => // Check that all non-null values are valid floats. if (allMatch.isValid && !allMatch.getBoolean) { throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) } - postInfProcessing.incRefCount() + infWithoutPlus.incRefCount() } } else { - isFloat.ifElse(postInfProcessing, nullString) + isFloat.ifElse(infWithoutPlus, nullString) } } } @@ -778,38 +827,32 @@ object GpuCast extends Arm { // required so we can round up if needed in the final step // 4. Now cast to newDt to dt (Decimal to Decimal) def getInterimDecimalPromoteIfNeeded(dt: DecimalType): DecimalType = { - if (dt.scale + 1 > dt.precision) { - // promote if possible or throw - if (dt.precision == Decimal.MAX_LONG_DIGITS) { - //We don't support Decimal 128 - throw new IllegalArgumentException("One or more values exceed the maximum supported " + - "Decimal precision while conversion") - } - DecimalType(dt.precision + 1, dt.scale + 1) + if (dt.precision + 1 > Decimal.MAX_LONG_DIGITS) { + //We don't support Decimal 128 + throw new IllegalArgumentException("One or more values exceed the maximum supported " + + "Decimal precision while conversion") } - DecimalType(dt.precision, dt.scale + 1) + DecimalType(dt.precision + 1, dt.scale + 1) } - withResource(input.strip()) { trimmed => - withResource(GpuCast.sanitizeStringToFloat(trimmed, ansiEnabled, false)) { sanitized => - val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) - val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) - withResource(Scalar.fromNull(interimDt)) { nulls => - withResource(sanitized.isFixedPoint(interimDt)) { isFixedPoints => - if (ansiEnabled) { - withResource(isFixedPoints.all()) { allFixedPoints => - if (allFixedPoints.isValid && !allFixedPoints.getBoolean) { - throw new ArithmeticException(s"One or more values cannot be " + - s"represented as Decimal(${dt.precision}, ${dt.scale})") - } - } + withResource(GpuCast.sanitizeStringToDecimal(input, ansiEnabled)) { sanitized => + val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) + val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) + withResource(sanitized.isFixedPoint(interimDt)) { isFixedPoints => + if (ansiEnabled) { + withResource(isFixedPoints.all()) { allFixedPoints => + if (allFixedPoints.isValid && !allFixedPoints.getBoolean) { + throw new ArithmeticException(s"One or more values cannot be " + + s"represented as Decimal(${dt.precision}, ${dt.scale})") } - // intermediate step needed so we can make sure we can round up - withResource(input.castTo(interimDt)) { interimDecimals => - withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => - // cast Decimal to the Decimal that's needed - castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) - } + } + } + // intermediate step needed so we can make sure we can round up + withResource(input.castTo(interimDt)) { interimDecimals => + withResource(Scalar.fromNull(interimDt)) { nulls => + withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => + // cast Decimal to the Decimal that's needed + castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 519b2d14673..2a63861334b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -875,12 +875,20 @@ class CastOpSuite extends GpuExpressionTestSuite { } test("cast string to decimal") { - List(-18, -10, -3, 0, 1, 5, 15).foreach { scale => - testCastToDecimal(DataTypes.StringType, scale, + List(-17, -10, -3, 0, 1, 5, 15).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale, precision = 17, customRandGenerator = Some(new scala.util.Random(1234L))) } } + test("cast string to decimal (fail)") { + assertThrows[IllegalArgumentException]( + List(-18, 18, 2, 32, 8).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale, + customRandGenerator = Some(new scala.util.Random(1234L))) + }) + } + test("cast string to decimal (include NaN/INF/-INF)") { def doubleStrings(ss: SparkSession): DataFrame = { val df1 = floatsAsStrings(ss).selectExpr("cast(c0 as Double) as col") @@ -888,7 +896,7 @@ class CastOpSuite extends GpuExpressionTestSuite { df1.unionAll(df2) } List(-10, -1, 0, 1, 10).foreach { scale => - testCastToDecimal(DataTypes.StringType, scale = scale, + testCastToDecimal(DataTypes.StringType, scale = scale, precision = 17, customDataGenerator = Some(doubleStrings)) } } @@ -898,15 +906,15 @@ class CastOpSuite extends GpuExpressionTestSuite { import ss.sqlContext.implicits._ column.toDF("col") } - testCastToDecimal(DataTypes.StringType, scale = 7, + testCastToDecimal(DataTypes.StringType, scale = 7, precision = 17, customDataGenerator = Some(specialGenerator(Seq("9999999999")))) - testCastToDecimal(DataTypes.StringType, scale = 2, + testCastToDecimal(DataTypes.StringType, scale = 2, precision = 17, customDataGenerator = Some(specialGenerator(Seq("999999999999999")))) - testCastToDecimal(DataTypes.StringType, scale = 0, + testCastToDecimal(DataTypes.StringType, scale = 0, precision = 17, customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) - testCastToDecimal(DataTypes.StringType, scale = -1, + testCastToDecimal(DataTypes.StringType, scale = -1, precision = 17, customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) - testCastToDecimal(DataTypes.StringType, scale = -10, + testCastToDecimal(DataTypes.StringType, scale = -10, precision = 17, customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) } @@ -915,7 +923,7 @@ class CastOpSuite extends GpuExpressionTestSuite { exponentsAsStringsDf(ss).select(col("c0").as("col")) } List(-10, -1, 0, 1, 10).foreach { scale => - testCastToDecimal(DataTypes.StringType, scale = scale, + testCastToDecimal(DataTypes.StringType, scale = scale, precision = 17, customDataGenerator = Some(exponentsAsStrings), ansiEnabled = true) } From 877afea5bf039875731f9f7c5755056306697455 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 15 Nov 2021 11:27:18 -0800 Subject: [PATCH 4/7] fixed more docs Signed-off-by: Raza Jafri --- .../src/main/scala/com/nvidia/spark/rapids/GpuCast.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 38337583a92..979e5ed2a87 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -821,11 +821,11 @@ object GpuCast extends Arm { input: ColumnView, ansiEnabled: Boolean, dt: DecimalType): ColumnVector = { - // 1. Sanitize strings to make sure all are floating points + // 1. Sanitize strings to make sure all are fixed points // 2. Identify all fixed point values - // 3. Cast String to newDt (newDt = dt.scale + 1). Promote precision if needed. This step is - // required so we can round up if needed in the final step - // 4. Now cast to newDt to dt (Decimal to Decimal) + // 3. Cast String to newDt (newDt = dt. precision + 1, dt.scale + 1). Promote precision if + // needed. This step is required so we can round up if needed in the final step + // 4. Now cast newDt to dt (Decimal to Decimal) def getInterimDecimalPromoteIfNeeded(dt: DecimalType): DecimalType = { if (dt.precision + 1 > Decimal.MAX_LONG_DIGITS) { //We don't support Decimal 128 From 1a64d7c56b505806f878bdd661583778cbccc33e Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 16 Nov 2021 14:47:58 -0800 Subject: [PATCH 5/7] addressed review comments Signed-off-by: Raza Jafri --- .../com/nvidia/spark/rapids/FloatUtils.scala | 2 - .../com/nvidia/spark/rapids/GpuCast.scala | 88 +++++++++---------- .../com/nvidia/spark/rapids/RapidsMeta.scala | 1 + .../apache/spark/sql/rapids/GpuScalaUDF.scala | 4 +- .../nvidia/spark/rapids/AnsiCastOpSuite.scala | 8 +- 5 files changed, 47 insertions(+), 56 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala index 14e86a1078e..57379585802 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala @@ -18,8 +18,6 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar} -import org.apache.spark.sql.types.DecimalType - object FloatUtils extends Arm { def nanToZero(cv: ColumnView): ColumnVector = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 979e5ed2a87..9e0ce1d43a7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -79,9 +79,8 @@ class CastExprMeta[INPUT <: CastBase]( "converting floating point data types to strings and this can produce results that " + "differ from the default behavior in Spark. To enable this operation on the GPU, set" + s" ${RapidsConf.ENABLE_CAST_FLOAT_TO_STRING} to true.") - case (_: StringType, dt: DecimalType) if (dt.precision + 1 > Decimal.MAX_LONG_DIGITS) => - willNotWorkOnGpu(s"Converting to $dt will result in a 128-bit temporary type that is " + - s"not supported on the GPU") + case (_: StringType, dt: DecimalType) if dt.precision + 1 > Decimal.MAX_LONG_DIGITS => + willNotWorkOnGpu(s"Because of rounding requirements we cannot support $dt on the GPU") case (_: StringType, _: FloatType | _: DoubleType) if !conf.isCastStringToFloatEnabled => willNotWorkOnGpu("Currently hex values aren't supported on the GPU. Also note " + "that casting from string to float types on the GPU returns incorrect results when " + @@ -152,14 +151,13 @@ object GpuCast extends Arm { val INVALID_INPUT_MESSAGE: String = "Column contains at least one value that is not in the " + "required range" - val INVALID_FLOAT_CAST_MSG: String = "At least one value is either null or is an invalid number" + val INVALID_NUMBER_MSG: String = "At least one value is either null or is an invalid number" def sanitizeStringToDecimal( input: ColumnView, ansiEnabled: Boolean): ColumnVector = { - // This regex gets applied to filter out known edge cases that would result in incorrect - // values. We further filter out invalid values using the cuDF isFixedPoint method. + // This regex gets applied to filter out strings that aren't fixed-point. val VALID_DEC_REGEX = "^" + // start of line "[+\\-]?" + // optional + or - at start of string @@ -175,30 +173,21 @@ object GpuCast extends Arm { ")" + "$" // end of line - withResource(input.strip()) { stripped => - withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString => - // filter out strings containing breaking whitespace - val withoutWhitespace = withResource(ColumnVector.fromStrings("\r", "\n")) { - verticalWhitespace => - withResource(stripped.contains(verticalWhitespace)) { - _.ifElse(nullString, stripped) - } - } - // filter out any strings that are not valid fixed-point numbers according - // to the regex pattern - withResource(withoutWhitespace) { _ => - withResource(withoutWhitespace.matchesRe(VALID_DEC_REGEX)) { isFixedPoint => - if (ansiEnabled) { - withResource(isFixedPoint.all()) { allMatch => - // Check that all non-null values are valid floats. - if (allMatch.isValid && !allMatch.getBoolean) { - throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) - } - withoutWhitespace.incRefCount() - } - } else { - isFixedPoint.ifElse(withoutWhitespace, nullString) + withResource(input.strip()) { withoutWhitespace => + // filter out any strings that are not valid fixed-point numbers according + // to the regex pattern + withResource(withoutWhitespace.matchesRe(VALID_DEC_REGEX)) { isFixedPoint => + if (ansiEnabled) { + withResource(isFixedPoint.all()) { allMatch => + // Check that all non-null values are valid floats. + if (allMatch.isValid && !allMatch.getBoolean) { + throw new NumberFormatException(GpuCast.INVALID_NUMBER_MSG) } + withoutWhitespace.incRefCount() + } + } else { + withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString => + isFixedPoint.ifElse(withoutWhitespace, nullString) } } } @@ -260,7 +249,7 @@ object GpuCast extends Arm { withResource(isFloat.all()) { allMatch => // Check that all non-null values are valid floats. if (allMatch.isValid && !allMatch.getBoolean) { - throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) + throw new NumberFormatException(GpuCast.INVALID_NUMBER_MSG) } infWithoutPlus.incRefCount() } @@ -835,25 +824,28 @@ object GpuCast extends Arm { DecimalType(dt.precision + 1, dt.scale + 1) } - withResource(GpuCast.sanitizeStringToDecimal(input, ansiEnabled)) { sanitized => - val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) - val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) - withResource(sanitized.isFixedPoint(interimDt)) { isFixedPoints => - if (ansiEnabled) { - withResource(isFixedPoints.all()) { allFixedPoints => - if (allFixedPoints.isValid && !allFixedPoints.getBoolean) { - throw new ArithmeticException(s"One or more values cannot be " + - s"represented as Decimal(${dt.precision}, ${dt.scale})") - } + val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) + val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) + val isFixedPoints = withResource(GpuCast.sanitizeStringToDecimal(input, ansiEnabled)) { + // We further filter out invalid values using the cuDF isFixedPoint method. + sanitized => sanitized.isFixedPoint(interimDt) + } + + withResource(isFixedPoints) { isFixedPoints => + if (ansiEnabled) { + withResource(isFixedPoints.all()) { allFixedPoints => + if (allFixedPoints.isValid && !allFixedPoints.getBoolean) { + throw new ArithmeticException(s"One or more values cannot be " + + s"represented as Decimal(${dt.precision}, ${dt.scale})") } } - // intermediate step needed so we can make sure we can round up - withResource(input.castTo(interimDt)) { interimDecimals => - withResource(Scalar.fromNull(interimDt)) { nulls => - withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => - // cast Decimal to the Decimal that's needed - castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) - } + } + // intermediate step needed so we can make sure we can round up + withResource(input.castTo(interimDt)) { interimDecimals => + withResource(Scalar.fromNull(interimDt)) { nulls => + withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => + // cast Decimal to the Decimal that's needed + castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) } } } @@ -887,7 +879,7 @@ object GpuCast extends Arm { withResource(nanOrFloat.all()) { allNanOrFloat => // Check that all non-null values are valid floats or NaN. if (allNanOrFloat.isValid && !allNanOrFloat.getBoolean) { - throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) + throw new NumberFormatException(GpuCast.INVALID_NUMBER_MSG) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 751b1013664..88929625d65 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -216,6 +216,7 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( */ final def canThisBeReplaced: Boolean = cannotBeReplacedReasons.exists(_.isEmpty) + /** * Returns true iff this must be replaced because its children have already been * replaced and this needs to also be replaced for compatibility. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala index 871e86a0152..aae1c2ed028 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.rapids import java.lang.invoke.SerializedLambda import com.nvidia.spark.RapidsUDF -import com.nvidia.spark.rapids.{DataFromReplacementRule, ExprMeta, GpuExpression, GpuRowBasedUserDefinedFunction, GpuUserDefinedFunction, RapidsConf, RapidsMeta, VersionUtils} +import com.nvidia.spark.rapids.{DataFromReplacementRule, ExprMeta, GpuExpression, GpuRowBasedUserDefinedFunction, GpuUserDefinedFunction, RapidsConf, RapidsMeta} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF, SpecializedGetters} import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType} +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType} case class GpuScalaUDF( function: RapidsUDF, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala index 4ff7eba53a6..851dc03a7f3 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala @@ -408,22 +408,22 @@ class AnsiCastOpSuite extends GpuExpressionTestSuite { } testCastFailsForBadInputs("Test bad cast 1 from strings to floats", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c0").cast(FloatType)) } testCastFailsForBadInputs("Test bad cast 2 from strings to floats", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c1").cast(FloatType)) } testCastFailsForBadInputs("Test bad cast 1 from strings to double", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c0").cast(DoubleType)) } testCastFailsForBadInputs("Test bad cast 2 from strings to double", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c1").cast(DoubleType)) } From 38dc136ec6fde51dc479c9d738f4bbab53e9804c Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 16 Nov 2021 15:20:39 -0800 Subject: [PATCH 6/7] removed empty line Signed-off-by: Raza Jafri --- .../src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 88929625d65..751b1013664 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -216,7 +216,6 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( */ final def canThisBeReplaced: Boolean = cannotBeReplacedReasons.exists(_.isEmpty) - /** * Returns true iff this must be replaced because its children have already been * replaced and this needs to also be replaced for compatibility. From aadbf4552bd3078998da05a1b1e4c927961795e3 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 17 Nov 2021 14:54:27 -0800 Subject: [PATCH 7/7] removed the sanitize decimal method Signed-off-by: Raza Jafri --- .../com/nvidia/spark/rapids/GpuCast.scala | 45 +------------------ 1 file changed, 2 insertions(+), 43 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 9e0ce1d43a7..80eefd4edf0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -153,47 +153,6 @@ object GpuCast extends Arm { val INVALID_NUMBER_MSG: String = "At least one value is either null or is an invalid number" - def sanitizeStringToDecimal( - input: ColumnView, - ansiEnabled: Boolean): ColumnVector = { - - // This regex gets applied to filter out strings that aren't fixed-point. - val VALID_DEC_REGEX = - "^" + // start of line - "[+\\-]?" + // optional + or - at start of string - "(" + - "(" + - "(" + - "([0-9]+)|" + // digits, OR - "([0-9]*\\.[0-9]+)|" + // decimal with optional leading and mandatory trailing, OR - "([0-9]+\\.[0-9]*)" + // decimal with mandatory leading and optional trailing - ")" + - "([eE][+\\-]?[0-9]+)?" + // exponent - ")" + - ")" + - "$" // end of line - - withResource(input.strip()) { withoutWhitespace => - // filter out any strings that are not valid fixed-point numbers according - // to the regex pattern - withResource(withoutWhitespace.matchesRe(VALID_DEC_REGEX)) { isFixedPoint => - if (ansiEnabled) { - withResource(isFixedPoint.all()) { allMatch => - // Check that all non-null values are valid floats. - if (allMatch.isValid && !allMatch.getBoolean) { - throw new NumberFormatException(GpuCast.INVALID_NUMBER_MSG) - } - withoutWhitespace.incRefCount() - } - } else { - withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString => - isFixedPoint.ifElse(withoutWhitespace, nullString) - } - } - } - } - } - def sanitizeStringToFloat( input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { @@ -826,9 +785,9 @@ object GpuCast extends Arm { val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) - val isFixedPoints = withResource(GpuCast.sanitizeStringToDecimal(input, ansiEnabled)) { + val isFixedPoints = withResource(input.strip()) { // We further filter out invalid values using the cuDF isFixedPoint method. - sanitized => sanitized.isFixedPoint(interimDt) + _.isFixedPoint(interimDt) } withResource(isFixedPoints) { isFixedPoints =>