Skip to content

Commit

Permalink
Improve support for reading CSV and JSON floating-point values (#4637)
Browse files Browse the repository at this point in the history
* Improve support for reading CSV and JSON floating-point values

Signed-off-by: Andy Grove <[email protected]>

* scalastyle and add comments

* Fix resource leak

Signed-off-by: Andy Grove <[email protected]>

* Add file reader tests for JSON

* more tests

* docs update

* ansi mode tests and bug fix

Signed-off-by: Andy Grove <[email protected]>

* add invalid float values so that ansi is covered correctly by the tests

* add invalid floats to csv_test

* Remove Nan/Inf values from simple_float_values.csv and enable overflow tests. Also update compatibility guide.

Signed-off-by: Andy Grove <[email protected]>

* move withResource to enclose for loop

* fix test regression

* change maxFloatDiff to 1e-6
  • Loading branch information
andygrove authored Feb 2, 2022
1 parent 74bc198 commit 4ce9fe7
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 46 deletions.
25 changes: 16 additions & 9 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,7 @@ date. Typically, one that overflowed.

### CSV Floating Point

The CSV parser is not able to parse `NaN` values. These are
likely to be turned into null values, as described in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/125).

Some floating-point values also appear to overflow but do not for the CPU as described in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/124).

Any number that overflows will not be turned into a null value.
Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float).

Also parsing of some values will not produce bit for bit identical results to what the CPU does.
They are within round-off errors except when they are close enough to overflow to Inf or -Inf which
Expand Down Expand Up @@ -480,7 +473,21 @@ The nested types(array, map and struct) are not supported yet in current version

### JSON Floating Point

Like the CSV reader, the JSON reader has the same floating point issue. Please refer to [CSV Floating Point](#csv-floating-point) section.
Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float).

The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark.

The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including
these formats when unquoted, will produce `null` on the CPU and may produce valid `NaN` and `Inf` results on the GPU.

```json
{ "number": "NaN" }
{ "number": "Infinity" }
{ "number": "-Infinity" }
```

Another limitation of the GPU JSON reader is that it will parse strings containing floating-point values where
Spark will treat them as invalid inputs and will just return `null`.

## LIKE

Expand Down
34 changes: 20 additions & 14 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -194,7 +194,7 @@ def read_impl(spark):
@pytest.mark.parametrize('name,schema,options', [
('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}),
('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}),
pytest.param('ts.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1091')),
pytest.param('ts.csv', _date_schema, {}),
pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')),
('ts.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
Expand Down Expand Up @@ -224,19 +224,25 @@ def read_impl(spark):
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}),
pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125i, https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125, https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}),
pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')),
pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')),
pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130'))
], ids=idfn)
@pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql])
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled_list):
updated_conf=copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
def test_basic_csv_read(std_input_path, name, schema, options, read_func, v1_enabled_list, ansi_enabled):
updated_conf=copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled
})
assert_gpu_and_cpu_are_equal_collect(read_func(std_input_path + '/' + name, schema, options),
conf=updated_conf)

Expand All @@ -245,13 +251,13 @@ def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled
# This would require multiLine reads to work correctly so we avoid these chars
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
StringGen('[aAbB ]{0,10}'),
StringGen('[nN][aA][nN]'),
StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'),
byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen,
DoubleGen(no_nans=True), # NaN, Inf, and -Inf are not supported
# Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124
# are fixed we should not have to special case float values any more.
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')),
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
DoubleGen(no_nans=False),
pytest.param(double_gen),
pytest.param(FloatGen(no_nans=False)),
pytest.param(float_gen),
TimestampGen()]

@approximate_float
Expand Down
59 changes: 52 additions & 7 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,44 @@
# This would require multiLine reads to work correctly, so we avoid these chars
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
StringGen('[aAbB ]{0,10}'),
StringGen('[nN][aA][nN]'),
StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'),
byte_gen, short_gen, int_gen, long_gen, boolean_gen,
# Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124
# are fixed we should not have to special case float values any more.
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')),
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
DoubleGen(no_nans=True)
pytest.param(double_gen),
pytest.param(FloatGen(no_nans=False)),
pytest.param(float_gen),
DoubleGen(no_nans=False)
]

_enable_all_types_conf = {
'spark.rapids.sql.format.json.enabled': 'true',
'spark.rapids.sql.format.json.read.enabled': 'true'}

_float_schema = StructType([
StructField('number', FloatType())])

_double_schema = StructType([
StructField('number', DoubleType())])

def read_json_df(data_path, schema, options = {}):
def read_impl(spark):
reader = spark.read
if not schema is None:
reader = reader.schema(schema)
for key, value in options.items():
reader = reader.option(key, value)
return debug_df(reader.json(data_path))
return read_impl

def read_json_sql(data_path, schema, options = {}):
opts = options
if not schema is None:
opts = copy_and_update(options, {'schema': schema})
def read_impl(spark):
spark.sql('DROP TABLE IF EXISTS `TMP_json_TABLE`')
return spark.catalog.createTable('TMP_json_TABLE', source='json', path=data_path, **opts)
return read_impl

@approximate_float
@pytest.mark.parametrize('data_gen', [
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
Expand Down Expand Up @@ -138,4 +163,24 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
.schema(schema)\
.option('timestampFormat', full_format)\
.json(data_path),
conf=updated_conf)
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'nan_and_inf.json',
pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')),
'floats.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
])
@pytest.mark.parametrize('schema', [_float_schema, _double_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, ansi_enabled):
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
{ "allowNonNumericNumbers": allow_non_numeric_numbers }),
conf=updated_conf)
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/floats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "number": -3.141592 }
{ "number": 3.141592 }
{ "number": 0.0 }
{ "number": -0.0 }
{ "number": -3.4028234663852886e+38 }
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/floats_edge_cases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "-3.141592" }
{ "number": "3.141592" }
{ "number": "-3.4028234663852886e+38" }
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/floats_invalid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"number"
true
false
bad
"bad"
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/floats_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": true }
{ "number": "not a float" }
{ "number": "" }
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf.csv
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"number"
NaN
Inf
+Inf
-Inf
NAN
nan
INF
+INF
-INF
Infinity
+Infinity
-Infinity
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "NaN" }
{ "number": "Infinity" }
{ "number": "-Infinity" }
12 changes: 12 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf_edge_cases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{ "number": "NAN" }
{ "number": "nan" }
{ "number": "INF" }
{ "number": "+INF" }
{ "number": "-INF" }
{ "number": INF }
{ "number": +INF }
{ "number": -INF }
{ "number": "Inf" }
{ "number": "+Inf" }
{ "number": "-Inf" }
{ "number": "+Infinity" }
10 changes: 1 addition & 9 deletions integration_tests/src/test/resources/simple_float_values.csv
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,4 @@ bad
1.7976931348623157E308
1.7976931348623157e+308
1.7976931348623158E308
1.2e-234
NAN
nan
NaN
Inf
-Inf
INF
-INF

1.2e-234
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.nvidia.spark.rapids

import scala.collection.mutable.ListBuffer
import scala.math.max

import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand All @@ -27,7 +28,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -164,7 +165,19 @@ abstract class GpuTextBasedPartitionReader(
} else {
readDataSchema
}
val cudfSchema = GpuColumnVector.from(dataSchema)

// read floating-point columns as strings in cuDF
val dataSchemaWithStrings = StructType(dataSchema.fields
.map(f => {
f.dataType match {
case DataTypes.FloatType | DataTypes.DoubleType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
}
}))
val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings)

// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))

Expand All @@ -175,7 +188,30 @@ abstract class GpuTextBasedPartitionReader(
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)

handleResult(newReadDataSchema, table)
// parse floating-point columns that were read as strings
val castTable = withResource(table) { _ =>
val columns = new ListBuffer[ColumnVector]()
// Table increases the ref counts on the columns so we have
// to close them after creating the table
withResource(columns) { _ =>
// ansi mode does not apply to text inputs
val ansiEnabled = false
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = dataSchema.fields(i).dataType match {
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
case DataTypes.DoubleType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64)
case _ =>
table.getColumn(i).incRefCount()
}
columns += castColumn
}
new Table(columns: _*)
}
}

handleResult(newReadDataSchema, castTable)
}
} finally {
dataBuffer.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
conf: SparkConf = new SparkConf(),
execsAllowedNonGpu: Seq[String] = Seq.empty,
batchSize: Int = 0,
repart: Int = 1)
repart: Int = 1,
maxFloatDiff: Double = 0.0)
(fn: DataFrame => DataFrame) {
if (batchSize > 0) {
makeBatchedBytes(batchSize, conf)
Expand All @@ -69,7 +70,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
testSparkResultsAreEqual(testName, df,
conf = conf, repart = repart,
execsAllowedNonGpu = execsAllowedNonGpu,
incompat = true, sort = true)(fn)
incompat = true, sort = true, maxFloatDiff = maxFloatDiff)(fn)
}

def firstDf(spark: SparkSession): DataFrame = {
Expand Down Expand Up @@ -637,6 +638,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
FLOAT_TEST_testSparkResultsAreEqual(
"doubles basic aggregates group by doubles",
doubleCsvDf,
maxFloatDiff = 0.000001,
conf = makeBatchedBytes(3, enableCsvConf())) {
frame => frame.groupBy("doubles").agg(
lit(456f),
Expand All @@ -653,6 +655,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
FLOAT_TEST_testSparkResultsAreEqual(
"doubles basic aggregates group by more_doubles",
doubleCsvDf,
maxFloatDiff = 0.000001,
conf = makeBatchedBytes(3, enableCsvConf())) {
frame => frame.groupBy("more_doubles").agg(
lit(456f),
Expand Down

0 comments on commit 4ce9fe7

Please sign in to comment.