Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve compatibility when reading timestamps from JSON and CSV sources #4938

Merged
merged 23 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,6 @@ will produce a different result compared to the plugin.

## CSV Reading

Due to inconsistencies between how CSV data is parsed CSV parsing is off by default.
Each data type can be enabled or disabled independently using the following configs.

* [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled)

If you know that your particular data type will be parsed correctly enough, you may enable each
type you expect to use. Often the performance improvement is so good that it is worth
checking if it is parsed correctly.

Spark is generally very strict when reading CSV and if the data does not conform with the
expected format exactly it will result in a `null` value. The underlying parser that the RAPIDS Accelerator
uses is much more lenient. If you have badly formatted CSV data you may get data back instead of
nulls.

Spark allows for stripping leading and trailing white space using various options that are off by
default. The plugin will strip leading and trailing space for all values except strings.

Expand Down Expand Up @@ -335,6 +321,7 @@ portion followed by one of the following formats:

* `HH:mm:ss.SSSXXX`
* `HH:mm:ss[.SSS][XXX]`
* `HH:mm:ss[.SSSXXX]`
* `HH:mm`
* `HH:mm:ss`
* `HH:mm[:ss]`
Expand Down Expand Up @@ -451,9 +438,15 @@ versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` repres
support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is
consistent with the behavior in Spark 3.3.0 and later.

Another limitation of the GPU JSON reader is that it will parse strings containing boolean or numeric values where
Another limitation of the GPU JSON reader is that it will parse strings containing non-string boolean or numeric values where
Spark will treat them as invalid inputs and will just return `null`.

### JSON Timestamps

There is currently no support for reading numeric values as timestamps and null values are returned instead
([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast
to timestamp.

### JSON Schema discovery

Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU
Expand Down
1 change: 0 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
Expand Down
11 changes: 5 additions & 6 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@
StructField('number', DoubleType()),
StructField('ignored_b', StringType())])

_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true',
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}
_enable_all_types_conf = {'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}

def read_csv_df(data_path, schema, options = {}):
def read_impl(spark):
Expand Down Expand Up @@ -196,6 +195,7 @@ def read_impl(spark):
('ts.csv', _date_schema, {}),
('date.csv', _date_schema, {}),
('ts.csv', _ts_schema, {}),
('str.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
('str.csv', _good_str_schema, {'header': 'true'}),
('no-comments.csv', _three_str_schema, {}),
Expand Down Expand Up @@ -306,7 +306,7 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
conf=updated_conf)

csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy', 'dd-MM-yyyy', 'dd/MM/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
Expand All @@ -322,7 +322,6 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, a
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
with_cpu_session(
lambda spark : gen_df(spark, gen).write\
Expand Down Expand Up @@ -360,7 +359,6 @@ def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list,
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
Expand Down Expand Up @@ -399,7 +397,8 @@ def test_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_enabled_
lambda spark : gen_df(spark, gen).write\
.option('timestampFormat', full_format)\
.csv(data_path))
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read\
.schema(schema)\
Expand Down
27 changes: 24 additions & 3 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
_date_schema = StructType([
StructField('number', DateType())])

_timestamp_schema = StructType([
StructField('number', TimestampType())])

_string_schema = StructType([
StructField('a', StringType())])

Expand Down Expand Up @@ -144,7 +147,7 @@ def test_json_input_meta(spark_tmp_path, v1_enabled_list):
conf=updated_conf)

json_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy', 'dd-MM-yyyy', 'dd/MM/yyyy']
@pytest.mark.parametrize('date_format', json_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_json_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list):
Expand Down Expand Up @@ -243,8 +246,7 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_
def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy,
'spark.rapids.sql.incompatibleDateFormats.enabled': True})
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
Expand Down Expand Up @@ -284,6 +286,25 @@ def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, an
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'timestamps.json',
])
@pytest.mark.parametrize('schema', [_timestamp_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_valid_timestamps(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
f = read_func(std_input_path + '/' + filename, schema, {})
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@pytest.mark.parametrize('schema', [_string_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_unquoted_chars', ["true"])
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/date.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
2020-09-16
2020-10-16
2021-09-16
2021-09-16
2021-09-16
2020-50-16
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/dates.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{ "number": "2020-09-16" }
{ "number": " 2020-09-16" }
{ "number": "2020-09-16 " }
{ "number": "1581-01-01" }
{ "number": "1583-01-01" }
11 changes: 11 additions & 0 deletions integration_tests/src/test/resources/timestamps.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{ "number": "2020-02-25" }
{ "number": "2020-02-25 14:46" }
{ "number": "2020-02-25T14:46" }
{ "number": "2020-02-25 14:46:00" }
{ "number": "2020-02-25T14:46:00" }
{ "number": "2020-02-25T14:46:00 " }
{ "number": "2020-02-25 14:46:00.123" }
{ "number": "2020-02-25T14:46:00.123" }
{ "number": " 2020-02-25T14:46:00.123" }
{ "number": "2020-02-25 14:46:00.123456" }
{ "number": "2020-02-25T14:46:00.123456" }
5 changes: 5 additions & 0 deletions integration_tests/src/test/resources/ts.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
2020-09-16
2020-09-16T22:32
2020-09-16 22:32
2020-09-16T22:32:01
2020-09-16T22:32:01
2020-09-16T22:32:01
2020-09-16 22:32:01
2020-09-16T22:32:01Z
2020-09-16 22:32:01Z
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package com.nvidia.spark.rapids.shims
import com.nvidia.spark.rapids._

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.json.rapids.shims.Spark30Xuntil33XFileOptionsShims
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2._

trait Spark30Xuntil33XShims extends Spark30Xuntil33XFileOptionsShims {
trait Spark30Xuntil33XShims extends SparkShims {

def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] = {
GpuOverrides.neverReplaceExec[ShowCurrentNamespaceExec]("Namespace metadata operation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.csv

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String = options.dateFormat
def timestampFormatInRead(options: CSVOptions): String = options.timestampFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.json

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String = options.dateFormat
def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Coalesce, DynamicPruningExpression, Expression, FileSourceMetadataAttribute, TimeAdd}
import org.apache.spark.sql.catalyst.json.rapids.shims.Spark33XFileOptionsShims
import org.apache.spark.sql.execution.{BaseSubqueryExec, CoalesceExec, FileSourceScanExec, InSubqueryExec, ProjectExec, ReusedSubqueryExec, SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, HadoopFsRelation, PartitionedFile}
Expand All @@ -34,7 +33,7 @@ import org.apache.spark.sql.rapids.shims.GpuTimeAdd
import org.apache.spark.sql.types.{CalendarIntervalType, DayTimeIntervalType, StructType}
import org.apache.spark.unsafe.types.CalendarInterval

trait Spark33XShims extends Spark33XFileOptionsShims {
trait Spark33XShims extends Spark321PlusShims {

/**
* For spark3.3+ optionally return null if element not exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
package org.apache.spark.sql.catalyst.csv

import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def timestampFormatInRead(options: CSVOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
package org.apache.spark.sql.catalyst.json

import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
}

This file was deleted.

Loading