Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for reading CSV and JSON floating-point values #4637

Merged
merged 13 commits into from
Feb 2, 2022
25 changes: 16 additions & 9 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,7 @@ date. Typically, one that overflowed.

### CSV Floating Point

The CSV parser is not able to parse `NaN` values. These are
likely to be turned into null values, as described in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/125).

Some floating-point values also appear to overflow but do not for the CPU as described in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/124).

Any number that overflows will not be turned into a null value.
Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float).

Also parsing of some values will not produce bit for bit identical results to what the CPU does.
They are within round-off errors except when they are close enough to overflow to Inf or -Inf which
Expand Down Expand Up @@ -480,7 +473,21 @@ The nested types(array, map and struct) are not supported yet in current version

### JSON Floating Point

Like the CSV reader, the JSON reader has the same floating point issue. Please refer to [CSV Floating Point](#csv-floating-point) section.
Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float).

The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark.

The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including
these formats when unquoted, will produce `null` on the CPU and may produce valid `NaN` and `Inf` results on the GPU.

```json
{ "number": "NaN" }
{ "number": "Infinity" }
{ "number": "-Infinity" }
```

Another limitation of the GPU JSON reader is that it will parse strings containing floating-point values where
Spark will treat them as invalid inputs and will just return `null`.

## LIKE

Expand Down
34 changes: 20 additions & 14 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -194,7 +194,7 @@ def read_impl(spark):
@pytest.mark.parametrize('name,schema,options', [
('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}),
('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}),
pytest.param('ts.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1091')),
pytest.param('ts.csv', _date_schema, {}),
pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')),
('ts.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
Expand Down Expand Up @@ -224,19 +224,25 @@ def read_impl(spark):
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}),
pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125i, https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125, https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}),
pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')),
pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')),
pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130'))
], ids=idfn)
@pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql])
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled_list):
updated_conf=copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
revans2 marked this conversation as resolved.
Show resolved Hide resolved
def test_basic_csv_read(std_input_path, name, schema, options, read_func, v1_enabled_list, ansi_enabled):
updated_conf=copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled
})
assert_gpu_and_cpu_are_equal_collect(read_func(std_input_path + '/' + name, schema, options),
conf=updated_conf)

Expand All @@ -245,13 +251,13 @@ def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled
# This would require multiLine reads to work correctly so we avoid these chars
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
StringGen('[aAbB ]{0,10}'),
StringGen('[nN][aA][nN]'),
StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'),
byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen,
DoubleGen(no_nans=True), # NaN, Inf, and -Inf are not supported
# Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124
# are fixed we should not have to special case float values any more.
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')),
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
DoubleGen(no_nans=False),
pytest.param(double_gen),
pytest.param(FloatGen(no_nans=False)),
pytest.param(float_gen),
TimestampGen()]

@approximate_float
Expand Down
59 changes: 52 additions & 7 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,44 @@
# This would require multiLine reads to work correctly, so we avoid these chars
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
StringGen('[aAbB ]{0,10}'),
StringGen('[nN][aA][nN]'),
StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'),
byte_gen, short_gen, int_gen, long_gen, boolean_gen,
# Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124
# are fixed we should not have to special case float values any more.
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')),
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
DoubleGen(no_nans=True)
pytest.param(double_gen),
pytest.param(FloatGen(no_nans=False)),
pytest.param(float_gen),
DoubleGen(no_nans=False)
]

_enable_all_types_conf = {
'spark.rapids.sql.format.json.enabled': 'true',
'spark.rapids.sql.format.json.read.enabled': 'true'}

_float_schema = StructType([
StructField('number', FloatType())])

_double_schema = StructType([
StructField('number', DoubleType())])

def read_json_df(data_path, schema, options = {}):
def read_impl(spark):
reader = spark.read
if not schema is None:
reader = reader.schema(schema)
for key, value in options.items():
reader = reader.option(key, value)
return debug_df(reader.json(data_path))
return read_impl

def read_json_sql(data_path, schema, options = {}):
opts = options
if not schema is None:
opts = copy_and_update(options, {'schema': schema})
def read_impl(spark):
spark.sql('DROP TABLE IF EXISTS `TMP_json_TABLE`')
return spark.catalog.createTable('TMP_json_TABLE', source='json', path=data_path, **opts)
return read_impl

@approximate_float
@pytest.mark.parametrize('data_gen', [
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
Expand Down Expand Up @@ -138,4 +163,24 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
.schema(schema)\
.option('timestampFormat', full_format)\
.json(data_path),
conf=updated_conf)
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'nan_and_inf.json',
pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')),
'floats.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
])
@pytest.mark.parametrize('schema', [_float_schema, _double_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, ansi_enabled):
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
{ "allowNonNumericNumbers": allow_non_numeric_numbers }),
conf=updated_conf)
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/floats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "number": -3.141592 }
{ "number": 3.141592 }
{ "number": 0.0 }
{ "number": -0.0 }
{ "number": -3.4028234663852886e+38 }
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/floats_edge_cases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "-3.141592" }
{ "number": "3.141592" }
{ "number": "-3.4028234663852886e+38" }
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/floats_invalid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"number"
true
false
bad
"bad"
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/floats_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": true }
{ "number": "not a float" }
{ "number": "" }
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf.csv
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"number"
NaN
Inf
+Inf
-Inf
NAN
nan
INF
+INF
-INF
Infinity
+Infinity
-Infinity
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "NaN" }
{ "number": "Infinity" }
{ "number": "-Infinity" }
12 changes: 12 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf_edge_cases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{ "number": "NAN" }
{ "number": "nan" }
{ "number": "INF" }
{ "number": "+INF" }
{ "number": "-INF" }
{ "number": INF }
{ "number": +INF }
{ "number": -INF }
{ "number": "Inf" }
{ "number": "+Inf" }
{ "number": "-Inf" }
{ "number": "+Infinity" }
10 changes: 1 addition & 9 deletions integration_tests/src/test/resources/simple_float_values.csv
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,4 @@ bad
1.7976931348623157E308
1.7976931348623157e+308
1.7976931348623158E308
1.2e-234
NAN
nan
NaN
Inf
-Inf
INF
-INF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nan and Inf values are already covered in nan_and_inf.csv

1.2e-234
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.nvidia.spark.rapids

import scala.collection.mutable.ListBuffer
import scala.math.max

import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand All @@ -27,7 +28,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -164,7 +165,19 @@ abstract class GpuTextBasedPartitionReader(
} else {
readDataSchema
}
val cudfSchema = GpuColumnVector.from(dataSchema)

// read floating-point columns as strings in cuDF
val dataSchemaWithStrings = StructType(dataSchema.fields
.map(f => {
f.dataType match {
case DataTypes.FloatType | DataTypes.DoubleType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
}
}))
val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings)

// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))

Expand All @@ -175,7 +188,30 @@ abstract class GpuTextBasedPartitionReader(
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)

handleResult(newReadDataSchema, table)
// parse floating-point columns that were read as strings
val castTable = withResource(table) { _ =>
val columns = new ListBuffer[ColumnVector]()
// Table increases the ref counts on the columns so we have
// to close them after creating the table
withResource(columns) { _ =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// ansi mode does not apply to text inputs
val ansiEnabled = false
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = dataSchema.fields(i).dataType match {
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
case DataTypes.DoubleType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64)
case _ =>
table.getColumn(i).incRefCount()
}
columns += castColumn
}
new Table(columns: _*)
}
}

handleResult(newReadDataSchema, castTable)
}
} finally {
dataBuffer.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
conf: SparkConf = new SparkConf(),
execsAllowedNonGpu: Seq[String] = Seq.empty,
batchSize: Int = 0,
repart: Int = 1)
repart: Int = 1,
maxFloatDiff: Double = 0.0)
(fn: DataFrame => DataFrame) {
if (batchSize > 0) {
makeBatchedBytes(batchSize, conf)
Expand All @@ -69,7 +70,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
testSparkResultsAreEqual(testName, df,
conf = conf, repart = repart,
execsAllowedNonGpu = execsAllowedNonGpu,
incompat = true, sort = true)(fn)
incompat = true, sort = true, maxFloatDiff = maxFloatDiff)(fn)
}

def firstDf(spark: SparkSession): DataFrame = {
Expand Down Expand Up @@ -637,6 +638,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
FLOAT_TEST_testSparkResultsAreEqual(
"doubles basic aggregates group by doubles",
doubleCsvDf,
maxFloatDiff = 0.000001,
conf = makeBatchedBytes(3, enableCsvConf())) {
frame => frame.groupBy("doubles").agg(
lit(456f),
Expand All @@ -653,6 +655,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
FLOAT_TEST_testSparkResultsAreEqual(
"doubles basic aggregates group by more_doubles",
doubleCsvDf,
maxFloatDiff = 0.000001,
conf = makeBatchedBytes(3, enableCsvConf())) {
frame => frame.groupBy("more_doubles").agg(
lit(456f),
Expand Down