diff --git a/docs/compatibility.md b/docs/compatibility.md index ada04f79e93..ceaa87077fa 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -280,20 +280,6 @@ will produce a different result compared to the plugin. ## CSV Reading -Due to inconsistencies between how CSV data is parsed CSV parsing is off by default. -Each data type can be enabled or disabled independently using the following configs. - - * [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled) - -If you know that your particular data type will be parsed correctly enough, you may enable each -type you expect to use. Often the performance improvement is so good that it is worth -checking if it is parsed correctly. - -Spark is generally very strict when reading CSV and if the data does not conform with the -expected format exactly it will result in a `null` value. The underlying parser that the RAPIDS Accelerator -uses is much more lenient. If you have badly formatted CSV data you may get data back instead of -nulls. - Spark allows for stripping leading and trailing white space using various options that are off by default. The plugin will strip leading and trailing space for all values except strings. @@ -335,6 +321,7 @@ portion followed by one of the following formats: * `HH:mm:ss.SSSXXX` * `HH:mm:ss[.SSS][XXX]` +* `HH:mm:ss[.SSSXXX]` * `HH:mm` * `HH:mm:ss` * `HH:mm[:ss]` @@ -525,9 +512,15 @@ versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` repres support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is consistent with the behavior in Spark 3.3.0 and later. -Another limitation of the GPU JSON reader is that it will parse strings containing boolean or numeric values where +Another limitation of the GPU JSON reader is that it will parse strings containing non-string boolean or numeric values where Spark will treat them as invalid inputs and will just return `null`. +### JSON Timestamps + +There is currently no support for reading numeric values as timestamps and null values are returned instead +([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast +to timestamp. + ### JSON Schema discovery Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU diff --git a/docs/configs.md b/docs/configs.md index dd7ab036bd2..d5c5b181e76 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -67,7 +67,6 @@ Name | Description | Default Value spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1 -spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 9635034c9a4..99f4b8a84ff 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -167,8 +167,7 @@ StructField('number', DoubleType()), StructField('ignored_b', StringType())]) -_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true', - 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'} +_enable_all_types_conf = {'spark.sql.legacy.timeParserPolicy': 'CORRECTED'} def read_csv_df(data_path, schema, spark_tmp_table_factory_ignored, options = {}): def read_impl(spark): @@ -196,6 +195,7 @@ def read_impl(spark): ('ts.csv', _date_schema, {}), ('date.csv', _date_schema, {}), ('ts.csv', _ts_schema, {}), + ('str.csv', _ts_schema, {}), ('str.csv', _bad_str_schema, {'header': 'true'}), ('str.csv', _good_str_schema, {'header': 'true'}), ('no-comments.csv', _three_str_schema, {}), @@ -306,7 +306,7 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf, spark_tmp_table_f conf=updated_conf) csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM', - 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy'] + 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy', 'dd-MM-yyyy', 'dd/MM/yyyy'] @pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "csv"]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) @@ -322,7 +322,6 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, a updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list, 'spark.sql.ansi.enabled': ansi_enabled, - 'spark.rapids.sql.incompatibleDateFormats.enabled': True, 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) with_cpu_session( lambda spark : gen_df(spark, gen).write\ @@ -360,7 +359,6 @@ def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list, updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list, 'spark.sql.ansi.enabled': ansi_enabled, - 'spark.rapids.sql.incompatibleDateFormats.enabled': True, 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) if time_parser_policy == 'EXCEPTION': assert_gpu_and_cpu_error( @@ -399,7 +397,8 @@ def test_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_enabled_ lambda spark : gen_df(spark, gen).write\ .option('timestampFormat', full_format)\ .csv(data_path)) - updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read\ .schema(schema)\ diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 2ea4ee3b815..98e7ecb10f4 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -69,6 +69,9 @@ _date_schema = StructType([ StructField('number', DateType())]) +_timestamp_schema = StructType([ + StructField('number', TimestampType())]) + _string_schema = StructType([ StructField('a', StringType())]) @@ -144,7 +147,7 @@ def test_json_input_meta(spark_tmp_path, v1_enabled_list): conf=updated_conf) json_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM', - 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy'] + 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy', 'dd-MM-yyyy', 'dd/MM/yyyy'] @pytest.mark.parametrize('date_format', json_supported_date_formats, ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "json"]) def test_json_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list): @@ -244,8 +247,7 @@ def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_ def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy, spark_tmp_table_factory): updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled, - 'spark.sql.legacy.timeParserPolicy': time_parser_policy, - 'spark.rapids.sql.incompatibleDateFormats.enabled': True}) + 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) f = read_func(std_input_path + '/' + filename, schema, spark_tmp_table_factory, {}) if time_parser_policy == 'LEGACY' and ansi_enabled == 'true': assert_gpu_fallback_collect( @@ -285,6 +287,26 @@ def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, an else: assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf) +@approximate_float +@pytest.mark.parametrize('filename', [ + 'timestamps.json', +]) +@pytest.mark.parametrize('schema', [_timestamp_schema]) +@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), + 'CORRECTED', + 'EXCEPTION' +]) +def test_json_read_valid_timestamps(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy, \ + spark_tmp_table_factory): + updated_conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.ansi.enabled': ansi_enabled, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) + f = read_func(std_input_path + '/' + filename, schema, spark_tmp_table_factory, {}) + assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf) + @pytest.mark.parametrize('schema', [_string_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_unquoted_chars', ["true"]) diff --git a/integration_tests/src/test/resources/date.csv b/integration_tests/src/test/resources/date.csv index 300b64dcc01..72fd528989a 100644 --- a/integration_tests/src/test/resources/date.csv +++ b/integration_tests/src/test/resources/date.csv @@ -1,4 +1,6 @@ 2020-09-16 2020-10-16 +2021-09-16 + 2021-09-16 2021-09-16 2020-50-16 diff --git a/integration_tests/src/test/resources/dates.json b/integration_tests/src/test/resources/dates.json index b551b4edfa8..1fdfc3b4320 100644 --- a/integration_tests/src/test/resources/dates.json +++ b/integration_tests/src/test/resources/dates.json @@ -1,3 +1,5 @@ { "number": "2020-09-16" } +{ "number": " 2020-09-16" } +{ "number": "2020-09-16 " } { "number": "1581-01-01" } { "number": "1583-01-01" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/timestamps.json b/integration_tests/src/test/resources/timestamps.json new file mode 100644 index 00000000000..69df364705a --- /dev/null +++ b/integration_tests/src/test/resources/timestamps.json @@ -0,0 +1,11 @@ +{ "number": "2020-02-25" } +{ "number": "2020-02-25 14:46" } +{ "number": "2020-02-25T14:46" } +{ "number": "2020-02-25 14:46:00" } +{ "number": "2020-02-25T14:46:00" } +{ "number": "2020-02-25T14:46:00 " } +{ "number": "2020-02-25 14:46:00.123" } +{ "number": "2020-02-25T14:46:00.123" } +{ "number": " 2020-02-25T14:46:00.123" } +{ "number": "2020-02-25 14:46:00.123456" } +{ "number": "2020-02-25T14:46:00.123456" } diff --git a/integration_tests/src/test/resources/ts.csv b/integration_tests/src/test/resources/ts.csv index f8db5b031bc..b7a91a236c6 100644 --- a/integration_tests/src/test/resources/ts.csv +++ b/integration_tests/src/test/resources/ts.csv @@ -1,4 +1,9 @@ 2020-09-16 2020-09-16T22:32 +2020-09-16 22:32 2020-09-16T22:32:01 + 2020-09-16T22:32:01 +2020-09-16T22:32:01 +2020-09-16 22:32:01 2020-09-16T22:32:01Z +2020-09-16 22:32:01Z diff --git a/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala index b8736640a9f..866fa191e46 100644 --- a/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala +++ b/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.csv object GpuCsvUtils { def dateFormatInRead(options: CSVOptions): String = options.dateFormat + def timestampFormatInRead(options: CSVOptions): String = options.timestampFormat } diff --git a/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index b22da8a4f71..3cfe43fa5f5 100644 --- a/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.json object GpuJsonUtils { def dateFormatInRead(options: JSONOptions): String = options.dateFormat + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat } diff --git a/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala deleted file mode 100644 index 40fa54478af..00000000000 --- a/sql-plugin/src/main/311until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.json.rapids.shims - -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.json.JSONOptions - -object FileOptionsShims { - - def timestampFormatInRead(fileOptions: Serializable): Option[String] = { - fileOptions match { - case csvOpts: CSVOptions => Option(csvOpts.timestampFormat) - case jsonOpts: JSONOptions => Option(jsonOpts.timestampFormat) - case _ => throw new RuntimeException("Wrong file options.") - } - } - -} diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala index b8736640a9f..866fa191e46 100644 --- a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala +++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.csv object GpuCsvUtils { def dateFormatInRead(options: CSVOptions): String = options.dateFormat + def timestampFormatInRead(options: CSVOptions): String = options.timestampFormat } diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index b22da8a4f71..3cfe43fa5f5 100644 --- a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.catalyst.json object GpuJsonUtils { def dateFormatInRead(options: JSONOptions): String = options.dateFormat + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat } diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala deleted file mode 100644 index 40fa54478af..00000000000 --- a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.json.rapids.shims - -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.json.JSONOptions - -object FileOptionsShims { - - def timestampFormatInRead(fileOptions: Serializable): Option[String] = { - fileOptions match { - case csvOpts: CSVOptions => Option(csvOpts.timestampFormat) - case jsonOpts: JSONOptions => Option(jsonOpts.timestampFormat) - case _ => throw new RuntimeException("Wrong file options.") - } - } - -} diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala index 2b7e5b2193a..ac1b6fb2550 100644 --- a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -21,4 +21,11 @@ import org.apache.spark.sql.catalyst.util.DateFormatter object GpuCsvUtils { def dateFormatInRead(options: CSVOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + + def timestampFormatInRead(options: CSVOptions): String = options.timestampFormatInRead.getOrElse( + if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" + }) } diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index cd112da4e7a..e9668f4154f 100644 --- a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -21,4 +21,11 @@ import org.apache.spark.sql.catalyst.util.DateFormatter object GpuJsonUtils { def dateFormatInRead(options: JSONOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( + if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" + }) } diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala deleted file mode 100644 index e33d4b085fa..00000000000 --- a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.json.rapids.shims - -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.json.JSONOptions - -object FileOptionsShims { - - def timestampFormatInRead(fileOptions: Serializable): Option[String] = { - fileOptions match { - case csvOpts: CSVOptions => csvOpts.dateFormatInRead - case jsonOpts: JSONOptions => jsonOpts.dateFormatInRead - case _ => throw new RuntimeException("Wrong file options.") - } - } - -} diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala index 2b7e5b2193a..2f41143f313 100644 --- a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala +++ b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -17,8 +17,16 @@ package org.apache.spark.sql.catalyst.csv import org.apache.spark.sql.catalyst.util.DateFormatter +import org.apache.spark.sql.internal.SQLConf object GpuCsvUtils { def dateFormatInRead(options: CSVOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + + def timestampFormatInRead(options: CSVOptions): String = options.timestampFormatInRead.getOrElse( + if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" + }) } diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index cd112da4e7a..dc2160ed6d0 100644 --- a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -17,8 +17,16 @@ package org.apache.spark.sql.catalyst.json import org.apache.spark.sql.catalyst.util.DateFormatter +import org.apache.spark.sql.internal.SQLConf object GpuJsonUtils { def dateFormatInRead(options: JSONOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( + if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" + }) } diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala deleted file mode 100644 index e33d4b085fa..00000000000 --- a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.json.rapids.shims - -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.json.JSONOptions - -object FileOptionsShims { - - def timestampFormatInRead(fileOptions: Serializable): Option[String] = { - fileOptions match { - case csvOpts: CSVOptions => csvOpts.dateFormatInRead - case jsonOpts: JSONOptions => jsonOpts.dateFormatInRead - case _ => throw new RuntimeException("Wrong file options.") - } - } - -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 15ab0b4987d..757ece86a35 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.{CSVOptions, GpuCsvUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.json.rapids.shims.FileOptionsShims import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.connector.read._ @@ -86,31 +85,6 @@ trait ScanWithMetrics { } object GpuCSVScan { - private val supportedDateFormats = Set( - "yyyy-MM-dd", - "yyyy/MM/dd", - "yyyy-MM", - "yyyy/MM", - "MM-yyyy", - "MM/yyyy", - "MM-dd-yyyy", - "MM/dd/yyyy" - // TODO "dd-MM-yyyy" and "dd/MM/yyyy" can also be supported, but only if we set - // dayfirst to true in the parser config. This is not plumbed into the java cudf yet - // and would need to coordinate with the timestamp format too, because both cannot - // coexist - ) - - private val supportedTsPortionFormats = Set( - "HH:mm:ss.SSSXXX", - "HH:mm:ss[.SSS][XXX]", - "HH:mm", - "HH:mm:ss", - "HH:mm[:ss]", - "HH:mm:ss.SSS", - "HH:mm:ss[.SSS]" - ) - def tagSupport(scanMeta: ScanMeta[CSVScan]) : Unit = { val scan = scanMeta.wrapped tagSupport( @@ -212,37 +186,27 @@ object GpuCSVScan { // parsedOptions.maxColumns was originally a performance optimization but is not used any more - if (readSchema.map(_.dataType).contains(DateType)) { - if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { - // Spark's CSV parser will parse the string "2020-50-16" to the date 2024/02/16 when - // timeParserPolicy is set to LEGACY mode and we would reject this as an invalid date - // so we fall back to CPU - meta.willNotWorkOnGpu(s"GpuCSVScan does not support timeParserPolicy=LEGACY") - } - DateUtils.tagAndGetCudfFormat(meta, + val types = readSchema.map(_.dataType).toSet + if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy && + (types.contains(DateType) || + types.contains(TimestampType))) { + // Spark's CSV parser will parse the string "2020-50-16" to the date 2024/02/16 when + // timeParserPolicy is set to LEGACY mode and we would reject this as an invalid date + // so we fall back to CPU + meta.willNotWorkOnGpu(s"GpuCSVScan does not support timeParserPolicy=LEGACY") + } + + if (types.contains(DateType)) { + GpuTextBasedDateUtils.tagCudfFormat(meta, GpuCsvUtils.dateFormatInRead(parsedOptions), parseString = true) } - if (readSchema.map(_.dataType).contains(TimestampType)) { - if (!meta.conf.isCsvTimestampReadEnabled) { - meta.willNotWorkOnGpu("GpuCSVScan does not support parsing timestamp types. To " + - s"enable it please set ${RapidsConf.ENABLE_CSV_TIMESTAMPS} to true.") - } + if (types.contains(TimestampType)) { if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) { meta.willNotWorkOnGpu("Only UTC zone id is supported") } - FileOptionsShims.timestampFormatInRead(parsedOptions).foreach { tsFormat => - val parts = tsFormat.split("'T'", 2) - if (parts.isEmpty) { - meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") - } - if (parts.headOption.exists(h => !supportedDateFormats.contains(h))) { - meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") - } - if (parts.length > 1 && !supportedTsPortionFormats.contains(parts(1))) { - meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") - } - } + GpuTextBasedDateUtils.tagCudfFormat(meta, + GpuCsvUtils.timestampFormatInRead(parsedOptions), parseString = true) } // TODO parsedOptions.emptyValueInRead @@ -414,4 +378,5 @@ class CSVPartitionReader( } override def dateFormat: String = GpuCsvUtils.dateFormatInRead(parsedOptions) + override def timestampFormat: String = GpuCsvUtils.timestampFormatInRead(parsedOptions) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index b965bfedccf..8f98fef9c8f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer import scala.math.max import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} +import com.nvidia.spark.rapids.DateUtils.{toStrf, TimestampFormatConversionException} import com.nvidia.spark.rapids.shims.GpuTypeShims import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -31,7 +32,8 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} -import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.{ExceptionTimeParserPolicy, GpuToTimestamp, LegacyTimeParserPolicy} import org.apache.spark.sql.types.{DataTypes, DecimalType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -176,7 +178,8 @@ abstract class GpuTextBasedPartitionReader( f.dataType match { case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | - DataTypes.DoubleType | _: DecimalType | DataTypes.DateType => + DataTypes.DoubleType | _: DecimalType | DataTypes.DateType | + DataTypes.TimestampType => f.copy(dataType = DataTypes.StringType) case other if GpuTypeShims.supportCsvRead(other) => f.copy(dataType = DataTypes.StringType) @@ -221,7 +224,10 @@ abstract class GpuTextBasedPartitionReader( case dt: DecimalType => castStringToDecimal(table.getColumn(i), dt) case DataTypes.DateType => - castStringToDate(table.getColumn(i)) + castStringToDate(table.getColumn(i), DType.TIMESTAMP_DAYS, failOnInvalid = true) + case DataTypes.TimestampType => + castStringToTimestamp(table.getColumn(i), timestampFormat, + DType.TIMESTAMP_MICROSECONDS) case other if GpuTypeShims.supportCsvRead(other) => GpuTypeShims.csvRead(table.getColumn(i), other) case _ => @@ -241,20 +247,139 @@ abstract class GpuTextBasedPartitionReader( } def dateFormat: String + def timestampFormat: String - def castStringToDate(input: ColumnVector): ColumnVector = { + def castStringToDate(input: ColumnVector, dt: DType, failOnInvalid: Boolean): ColumnVector = { val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true) - withResource(input.isTimestamp(cudfFormat)) { isDate => - if (GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) { - withResource(isDate.all()) { all => - if (all.isValid && !all.getBoolean) { - throw new DateTimeException("One or more values is not a valid date") + withResource(input.strip()) { stripped => + withResource(stripped.isTimestamp(cudfFormat)) { isDate => + if (failOnInvalid && GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) { + withResource(isDate.all()) { all => + if (all.isValid && !all.getBoolean) { + throw new DateTimeException("One or more values is not a valid date") + } + } + } + withResource(stripped.asTimestamp(dt, cudfFormat)) { asDate => + withResource(Scalar.fromNull(dt)) { nullScalar => + isDate.ifElse(asDate, nullScalar) + } + } + } + } + } + + def castStringToTimestamp( + lhs: ColumnVector, + sparkFormat: String, + dtype: DType): ColumnVector = { + + val optionalSeconds = raw"(?:\:\d{2})?" + val optionalMicros = raw"(?:\.\d{1,6})?" + val twoDigits = raw"\d{2}" + val fourDigits = raw"\d{4}" + + val regexRoot = sparkFormat + .replace("'T'", "T") + .replace("yyyy", fourDigits) + .replace("MM", twoDigits) + .replace("dd", twoDigits) + .replace("HH", twoDigits) + .replace("mm", twoDigits) + .replace("[:ss]", optionalSeconds) + .replace(":ss", optionalSeconds) // Spark always treats seconds portion as optional + .replace("[.SSSXXX]", optionalMicros) + .replace("[.SSS][XXX]", optionalMicros) + .replace("[.SSS]", optionalMicros) + .replace("[.SSSSSS]", optionalMicros) + .replace(".SSSXXX", optionalMicros) + .replace(".SSSSSS", optionalMicros) + .replace(".SSS", optionalMicros) + + // Spark treats timestamp portion as optional always + val regexOptionalTime = regexRoot.split('T') match { + case Array(d, t) => + d + "(?:[ T]" + t + ")?" + case _ => + regexRoot + } + val regex = regexOptionalTime + raw"Z?\Z" + + // get a list of all possible cuDF formats that we need to check for + val cudfFormats = GpuTextBasedDateUtils.toCudfFormats(sparkFormat, parseString = true) + + + // filter by regexp first to eliminate invalid entries + val regexpFiltered = withResource(lhs.strip()) { stripped => + withResource(stripped.matchesRe(regex)) { matchesRe => + withResource(Scalar.fromNull(DType.STRING)) { nullString => + matchesRe.ifElse(stripped, nullString) + } + } + } + + // fix timestamps that have milliseconds but no microseconds + // example ".296" => ".296000" + val sanitized = withResource(regexpFiltered) { _ => + // cannot replace with back-refs directly because cuDF cannot support "\1000\2" so we + // first substitute with a placeholder and then replace that. The placeholder value + // `@` was chosen somewhat arbitrarily but should be safe since we do not support any + // date/time formats that contain the `@` character + val placeholder = "@" + withResource(regexpFiltered.stringReplaceWithBackrefs( + raw"(\.\d{3})(Z?)\Z", raw"\1$placeholder\2")) { tmp => + withResource(Scalar.fromString(placeholder)) { from => + withResource(Scalar.fromString("000")) { to => + tmp.stringReplace(from, to) } } } - withResource(input.asTimestamp(DType.TIMESTAMP_DAYS, cudfFormat)) { asDate => - withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { nullScalar => - isDate.ifElse(asDate, nullScalar) + } + + def isTimestamp(fmt: String): ColumnVector = { + val pos = fmt.indexOf('T') + if (pos == -1) { + sanitized.isTimestamp(fmt) + } else { + // Spark supports both ` ` and `T` as the delimiter so we have to test + // for both formats when calling `isTimestamp` in cuDF but the + // `asTimestamp` method ignores the delimiter so we only need to call that + // with one format + val withSpaceDelim = fmt.substring(0, pos) + ' ' + fmt.substring(pos + 1) + withResource(sanitized.isTimestamp(fmt)) { isValidFmt1 => + withResource(sanitized.isTimestamp(withSpaceDelim)) { isValidFmt2 => + isValidFmt1.or(isValidFmt2) + } + } + } + } + + def asTimestampOrNull(fmt: String): ColumnVector = { + withResource(Scalar.fromNull(dtype)) { nullScalar => + withResource(isTimestamp(fmt)) { isValid => + withResource(sanitized.asTimestamp(dtype, fmt)) { ts => + isValid.ifElse(ts, nullScalar) + } + } + } + } + + def asTimestampOr(fmt: String, orValue: ColumnVector): ColumnVector = { + withResource(orValue) { _ => + withResource(isTimestamp(fmt)) { isValid => + withResource(sanitized.asTimestamp(dtype, fmt)) { ts => + isValid.ifElse(ts, orValue) + } + } + } + } + + withResource(sanitized) { _ => + if (cudfFormats.length == 1) { + asTimestampOrNull(cudfFormats.head) + } else { + cudfFormats.tail.foldLeft(asTimestampOrNull(cudfFormats.head)) { (input, fmt) => + asTimestampOr(fmt, input) } } } @@ -364,3 +489,148 @@ abstract class GpuTextBasedPartitionReader( isExhausted = true } } + +object GpuTextBasedDateUtils { + + private val supportedDateFormats = Set( + "yyyy-MM-dd", + "yyyy/MM/dd", + "yyyy-MM", + "yyyy/MM", + "MM-yyyy", + "MM/yyyy", + "MM-dd-yyyy", + "MM/dd/yyyy", + "dd-MM-yyyy", + "dd/MM/yyyy" + ) + + private val supportedTsPortionFormats = Set( + "HH:mm:ss.SSSXXX", + "HH:mm:ss[.SSS][XXX]", + "HH:mm:ss[.SSSXXX]", + "HH:mm", + "HH:mm:ss", + "HH:mm[:ss]", + "HH:mm:ss.SSS", + "HH:mm:ss[.SSS]" + ) + + def tagCudfFormat( + meta: RapidsMeta[_, _, _], + sparkFormat: String, + parseString: Boolean): Unit = { + if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { + try { + // try and convert the format to cuDF format - this will throw an exception if + // the format contains unsupported characters or words + toCudfFormats(sparkFormat, parseString) + // format parsed ok but we have no 100% compatible formats in LEGACY mode + if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) { + // LEGACY support has a number of issues that mean we cannot guarantee + // compatibility with CPU + // - we can only support 4 digit years but Spark supports a wider range + // - we use a proleptic Gregorian calender but Spark uses a hybrid Julian+Gregorian + // calender in LEGACY mode + if (SQLConf.get.ansiEnabled) { + meta.willNotWorkOnGpu("LEGACY format in ANSI mode is not supported on the GPU") + } else if (!meta.conf.incompatDateFormats) { + meta.willNotWorkOnGpu(s"LEGACY format '$sparkFormat' on the GPU is not guaranteed " + + s"to produce the same results as Spark on CPU. Set " + + s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.") + } + } else { + meta.willNotWorkOnGpu(s"LEGACY format '$sparkFormat' is not supported on the GPU.") + } + } catch { + case e: TimestampFormatConversionException => + meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") + } + } else { + val parts = sparkFormat.split("'T'", 2) + if (parts.isEmpty) { + meta.willNotWorkOnGpu(s"the timestamp format '$sparkFormat' is not supported") + } + if (parts.headOption.exists(h => !supportedDateFormats.contains(h))) { + meta.willNotWorkOnGpu(s"the timestamp format '$sparkFormat' is not supported") + } + if (parts.length > 1 && !supportedTsPortionFormats.contains(parts(1))) { + meta.willNotWorkOnGpu(s"the timestamp format '$sparkFormat' is not supported") + } + try { + // try and convert the format to cuDF format - this will throw an exception if + // the format contains unsupported characters or words + toCudfFormats(sparkFormat, parseString) + } catch { + case e: TimestampFormatConversionException => + meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}") + } + } + } + + /** + * Get the list of all cuDF formats that need to be checked for when parsing timestamps. The + * returned formats must be ordered such that the first format is the most lenient and the + * last is the least lenient. + * + * For example, the spark format `yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]` would result in the + * following cuDF formats being returned, in this order: + * + * - `%Y-%m-%d` + * - `%Y-%m-%dT%H:%M` + * - `%Y-%m-%dT%H:%M:%S` + * - `%Y-%m-%dT%H:%M:%S.%f` + */ + def toCudfFormats(sparkFormat: String, parseString: Boolean): Seq[String] = { + val hasZsuffix = sparkFormat.endsWith("Z") + val formatRoot = if (hasZsuffix) { + sparkFormat.substring(0, sparkFormat.length-1) + } else { + sparkFormat + } + + // strip off suffixes that cuDF will not recognize + val cudfSupportedFormat = formatRoot + .replace("'T'", "T") + .replace("[.SSSXXX]", "") + .replace("[.SSS][XXX]", "") + .replace("[.SSS]", "") + .replace("[.SSSSSS]", "") + .replace(".SSSXXX", "") + .replace(".SSS", "") + .replace("[:ss]", "") + + val cudfFormat = toStrf(cudfSupportedFormat, parseString) + val suffix = if (hasZsuffix) "Z" else "" + + val optionalFractional = Seq("[.SSS][XXX]", "[.SSS]", "[.SSSSSS]", "[.SSS][XXX]", + ".SSSXXX", ".SSS") + val baseFormats = if (optionalFractional.exists(formatRoot.endsWith)) { + val cudfFormat1 = cudfFormat + suffix + val cudfFormat2 = cudfFormat + ".%f" + suffix + Seq(cudfFormat1, cudfFormat2) + } else if (formatRoot.endsWith("[:ss]")) { + Seq(cudfFormat + ":%S" + suffix) + } else { + Seq(cudfFormat) + } + + val pos = baseFormats.head.indexOf('T') + val formatsIncludingDateOnly = if (pos == -1) { + baseFormats + } else { + Seq(baseFormats.head.substring(0, pos)) ++ baseFormats + } + + // seconds are always optional in Spark + val formats = ListBuffer[String]() + for (fmt <- formatsIncludingDateOnly) { + if (fmt.contains(":%S") && !fmt.contains("%f")) { + formats += fmt.replace(":%S", "") + } + formats += fmt + } + formats + } + +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index d03614b66af..35bb7d63de4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -889,14 +889,6 @@ object RapidsConf { .booleanConf .createWithDefault(true) - // TODO should we change this config? - val ENABLE_CSV_TIMESTAMPS = conf("spark.rapids.sql.csvTimestamps.enabled") - .doc("When set to true, enables the CSV parser to read timestamps. The default output " + - "format for Spark includes a timezone at the end. Anything except the UTC timezone is " + - "not supported. Timestamps after 2038 and before 1902 are also not supported.") - .booleanConf - .createWithDefault(false) - val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled") .doc("When set to true enables all json input and output acceleration. " + "(only input is currently supported anyways)") @@ -1609,8 +1601,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCastFloatToIntegralTypesEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES) - lazy val isCsvTimestampReadEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) - lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING) lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 44c1d2ce866..82c9c90eacd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -2144,17 +2144,7 @@ object SupportedOpsForTools { val read = ioMap(ReadFileOp) // we have lots of configs for various operations, just try to get the main ones val readOps = types.map { t => - val typeEnabled = if (format.toString.toLowerCase.equals("csv")) { - t.toString match { - case "TIMESTAMP" => conf.isCsvTimestampReadEnabled - case _ => true - } - } else { - t.toString match { - case _ => true - } - } - if (!formatEnabled || !typeEnabled) { + if (!formatEnabled) { // indicate configured off by default "CO" } else { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 447862225a6..54c4a0593fa 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.json.{GpuJsonUtils, JSONOptions, JSONOptionsInRead} -import org.apache.spark.sql.catalyst.json.rapids.shims.FileOptionsShims import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.connector.read.{PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException @@ -46,31 +45,6 @@ import org.apache.spark.util.SerializableConfiguration object GpuJsonScan { - private val supportedDateFormats = Set( - "yyyy-MM-dd", - "yyyy/MM/dd", - "yyyy-MM", - "yyyy/MM", - "MM-yyyy", - "MM/yyyy", - "MM-dd-yyyy", - "MM/dd/yyyy" - // TODO "dd-MM-yyyy" and "dd/MM/yyyy" can also be supported, but only if we set - // dayfirst to true in the parser config. This is not plumbed into the java cudf yet - // and would need to coordinate with the timestamp format too, because both cannot - // coexist - ) - - private val supportedTsPortionFormats = Set( - "HH:mm:ss.SSSXXX", - "HH:mm:ss[.SSS][XXX]", - "HH:mm", - "HH:mm:ss", - "HH:mm[:ss]", - "HH:mm:ss.SSS", - "HH:mm:ss[.SSS]" - ) - def tagSupport(scanMeta: ScanMeta[JsonScan]) : Unit = { val scan = scanMeta.wrapped tagSupport( @@ -148,7 +122,7 @@ object GpuJsonScan { }) if (readSchema.map(_.dataType).contains(DateType)) { - DateUtils.tagAndGetCudfFormat(meta, + GpuTextBasedDateUtils.tagCudfFormat(meta, GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true) } @@ -156,18 +130,8 @@ object GpuJsonScan { if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) { meta.willNotWorkOnGpu("Only UTC zone id is supported") } - FileOptionsShims.timestampFormatInRead(parsedOptions).foreach { tsFormat => - val parts = tsFormat.split("'T'", 2) - if (parts.isEmpty) { - meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") - } - if (parts.headOption.exists(h => !supportedDateFormats.contains(h))) { - meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") - } - if (parts.length > 1 && !supportedTsPortionFormats.contains(parts(1))) { - meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") - } - } + GpuTextBasedDateUtils.tagCudfFormat(meta, + GpuJsonUtils.timestampFormatInRead(parsedOptions), parseString = true) } dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => @@ -412,5 +376,5 @@ class JsonPartitionReader( } override def dateFormat: String = GpuJsonUtils.dateFormatInRead(parsedOptions) - + override def timestampFormat: String = GpuJsonUtils.timestampFormatInRead(parsedOptions) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala index 531bffc7d92..03c72da0e55 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala @@ -18,14 +18,12 @@ package com.nvidia.spark.rapids import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{col, date_add, lit} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StructField, StructType, TimestampType} class CsvScanSuite extends SparkQueryCompareTestSuite { - testExpectedException[IllegalArgumentException]("Test CSV projection including unsupported types", - _.getMessage.startsWith("Part of the plan is not columnar"), - mixedTypesFromCsvWithHeader, - // Shuffle can go back the the CPU because teh CSV read is not on the GPU, but we want to make - // sure the error is wht we expect - execsAllowedNonGpu = Seq("ShuffleExchangeExec")) { + testSparkResultsAreEqual("Test CSV projection with whitespace delimiter between date and time", + mixedTypesFromCsvWithHeader) { frame => frame.select(col("c_string"), col("c_int"), col("c_timestamp")) } @@ -75,4 +73,35 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { conf=new SparkConf()) { df => df.withColumn("next_day", date_add(col("dates"), lit(1))) } + + // Fails with Spark 3.2.0 and later - see https://github.com/NVIDIA/spark-rapids/issues/4940 + testSparkResultsAreEqual( + "Test CSV parse ints as timestamps ansiEnabled=false", + intsAsTimestampsFromCsv, + assumeCondition = _ => (!VersionUtils.isSpark320OrLater, + "https://github.com/NVIDIA/spark-rapids/issues/4940"), + conf=new SparkConf().set(SQLConf.ANSI_ENABLED.key, "false")) { + df => df + } + + // Fails with Spark 3.2.0 and later - see https://github.com/NVIDIA/spark-rapids/issues/4940 + testSparkResultsAreEqual( + "Test CSV parse ints as timestamps ansiEnabled=true", + intsAsTimestampsFromCsv, + assumeCondition = _ => (!VersionUtils.isSpark320OrLater, + "https://github.com/NVIDIA/spark-rapids/issues/4940"), + conf=new SparkConf().set(SQLConf.ANSI_ENABLED.key, "true")) { + df => df + } + + private def intsAsTimestampsFromCsv = { + fromCsvDf("ints.csv", StructType(Array( + StructField("ints_1", TimestampType), + StructField("ints_2", TimestampType), + StructField("ints_3", TimestampType), + StructField("ints_4", TimestampType) + )))(_) + } + + } diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index 821acaa19cb..45128d0d069 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -1,6 +1,6 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT Avro,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO -CSV,read,S,S,S,S,S,S,S,S,CO,S,S,NA,NS,NA,NA,NA,NA,NA +CSV,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,NA,NA,NA,NA JSON,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA