Skip to content

Commit

Permalink
Improve compatibility when reading timestamps from JSON and CSV sourc…
Browse files Browse the repository at this point in the history
…es (#4938)

Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove authored Mar 24, 2022
1 parent 1d3b836 commit ceb3558
Show file tree
Hide file tree
Showing 27 changed files with 431 additions and 284 deletions.
23 changes: 8 additions & 15 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,6 @@ will produce a different result compared to the plugin.

## CSV Reading

Due to inconsistencies between how CSV data is parsed CSV parsing is off by default.
Each data type can be enabled or disabled independently using the following configs.

* [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled)

If you know that your particular data type will be parsed correctly enough, you may enable each
type you expect to use. Often the performance improvement is so good that it is worth
checking if it is parsed correctly.

Spark is generally very strict when reading CSV and if the data does not conform with the
expected format exactly it will result in a `null` value. The underlying parser that the RAPIDS Accelerator
uses is much more lenient. If you have badly formatted CSV data you may get data back instead of
nulls.

Spark allows for stripping leading and trailing white space using various options that are off by
default. The plugin will strip leading and trailing space for all values except strings.

Expand Down Expand Up @@ -335,6 +321,7 @@ portion followed by one of the following formats:

* `HH:mm:ss.SSSXXX`
* `HH:mm:ss[.SSS][XXX]`
* `HH:mm:ss[.SSSXXX]`
* `HH:mm`
* `HH:mm:ss`
* `HH:mm[:ss]`
Expand Down Expand Up @@ -525,9 +512,15 @@ versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` repres
support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is
consistent with the behavior in Spark 3.3.0 and later.

Another limitation of the GPU JSON reader is that it will parse strings containing boolean or numeric values where
Another limitation of the GPU JSON reader is that it will parse strings containing non-string boolean or numeric values where
Spark will treat them as invalid inputs and will just return `null`.

### JSON Timestamps

There is currently no support for reading numeric values as timestamps and null values are returned instead
([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast
to timestamp.

### JSON Schema discovery

Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU
Expand Down
1 change: 0 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
Expand Down
11 changes: 5 additions & 6 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@
StructField('number', DoubleType()),
StructField('ignored_b', StringType())])

_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true',
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}
_enable_all_types_conf = {'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}

def read_csv_df(data_path, schema, spark_tmp_table_factory_ignored, options = {}):
def read_impl(spark):
Expand Down Expand Up @@ -196,6 +195,7 @@ def read_impl(spark):
('ts.csv', _date_schema, {}),
('date.csv', _date_schema, {}),
('ts.csv', _ts_schema, {}),
('str.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
('str.csv', _good_str_schema, {'header': 'true'}),
('no-comments.csv', _three_str_schema, {}),
Expand Down Expand Up @@ -306,7 +306,7 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf, spark_tmp_table_f
conf=updated_conf)

csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy', 'dd-MM-yyyy', 'dd/MM/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
Expand All @@ -322,7 +322,6 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, a
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
with_cpu_session(
lambda spark : gen_df(spark, gen).write\
Expand Down Expand Up @@ -360,7 +359,6 @@ def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list,
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
Expand Down Expand Up @@ -399,7 +397,8 @@ def test_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_enabled_
lambda spark : gen_df(spark, gen).write\
.option('timestampFormat', full_format)\
.csv(data_path))
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read\
.schema(schema)\
Expand Down
28 changes: 25 additions & 3 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
_date_schema = StructType([
StructField('number', DateType())])

_timestamp_schema = StructType([
StructField('number', TimestampType())])

_string_schema = StructType([
StructField('a', StringType())])

Expand Down Expand Up @@ -144,7 +147,7 @@ def test_json_input_meta(spark_tmp_path, v1_enabled_list):
conf=updated_conf)

json_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy', 'dd-MM-yyyy', 'dd/MM/yyyy']
@pytest.mark.parametrize('date_format', json_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_json_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list):
Expand Down Expand Up @@ -244,8 +247,7 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_
def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy, spark_tmp_table_factory):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy,
'spark.rapids.sql.incompatibleDateFormats.enabled': True})
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
f = read_func(std_input_path + '/' + filename, schema, spark_tmp_table_factory, {})
if time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
Expand Down Expand Up @@ -285,6 +287,26 @@ def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, an
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'timestamps.json',
])
@pytest.mark.parametrize('schema', [_timestamp_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_valid_timestamps(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy, \
spark_tmp_table_factory):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
f = read_func(std_input_path + '/' + filename, schema, spark_tmp_table_factory, {})
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@pytest.mark.parametrize('schema', [_string_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_unquoted_chars', ["true"])
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/date.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
2020-09-16
2020-10-16
2021-09-16
2021-09-16
2021-09-16
2020-50-16
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/dates.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{ "number": "2020-09-16" }
{ "number": " 2020-09-16" }
{ "number": "2020-09-16 " }
{ "number": "1581-01-01" }
{ "number": "1583-01-01" }
11 changes: 11 additions & 0 deletions integration_tests/src/test/resources/timestamps.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{ "number": "2020-02-25" }
{ "number": "2020-02-25 14:46" }
{ "number": "2020-02-25T14:46" }
{ "number": "2020-02-25 14:46:00" }
{ "number": "2020-02-25T14:46:00" }
{ "number": "2020-02-25T14:46:00 " }
{ "number": "2020-02-25 14:46:00.123" }
{ "number": "2020-02-25T14:46:00.123" }
{ "number": " 2020-02-25T14:46:00.123" }
{ "number": "2020-02-25 14:46:00.123456" }
{ "number": "2020-02-25T14:46:00.123456" }
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/ts.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
2020-09-16
2020-09-16T22:32
2020-09-16 22:32
2020-09-16T22:32:01
2020-09-16T22:32:01
2020-09-16T22:32:01
2020-09-16 22:32:01
2020-09-16T22:32:01Z
2020-09-16 22:32:01Z
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.csv

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String = options.dateFormat
def timestampFormatInRead(options: CSVOptions): String = options.timestampFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.json

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String = options.dateFormat
def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.csv

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String = options.dateFormat
def timestampFormatInRead(options: CSVOptions): String = options.timestampFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.json

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String = options.dateFormat
def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def timestampFormatInRead(options: CSVOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
package org.apache.spark.sql.catalyst.csv

import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def timestampFormatInRead(options: CSVOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
package org.apache.spark.sql.catalyst.json

import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
}
Loading

0 comments on commit ceb3558

Please sign in to comment.