Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25945][SQL] Support locale while parsing date/timestamp from CSV/JSON #22951

Closed
wants to merge 12 commits into from
15 changes: 11 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None):
dropFieldIfAllNull=None, encoding=None, locale=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -249,6 +249,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param dropFieldIfAllNull: whether to ignore column of all null values or empty
array/struct during schema inference. If None is set, it
uses the default value, ``false``.
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.

>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand All @@ -267,7 +270,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding)
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
MaxGekk marked this conversation as resolved.
Show resolved Hide resolved
locale=locale)
MaxGekk marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -349,7 +353,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None):
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add emptyValue in streaming.py in the same separate PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it exists in streaming.py:

enforceSchema=None, emptyValue=None):

r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -446,6 +450,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
If None is set, it uses the default value, ``1.0``.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, empty string.
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we should apply to decimal parsing too actually. But yea we can leave it separate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems parsing decimals using locale will be slightly tricky in JSON case because we leave this to Jackson by calling its method getCurrentToken and getDecimalValue, and I haven't found how to pass locale to it. Probably we will need a custom deserialiser?

In the CSV case, it should be easier since we convert strings ourselves. I will try to do that for CSV first of all when this PR be merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR for parsing decimals from CSV: #22979


>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
Expand All @@ -465,7 +472,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue)
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ class CSVOptions(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)

val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,19 @@ private[sql] class JSONOptions(
// Whether to ignore column of all null values or empty array/struct during schema inference
val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)

// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)

val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale}

import org.scalatest.exceptions.TestFailedException

Expand Down Expand Up @@ -209,4 +210,20 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P
"2015-12-31T16:00:00"
)
}

test("parse date with locale") {
Seq("en-US", "ru-RU").foreach { langTag =>
val locale = Locale.forLanguageTag(langTag)
val date = new SimpleDateFormat("yyyy-MM-dd").parse("2018-11-05")
val schema = new StructType().add("d", DateType)
val dateFormat = "MMM yyyy"
val sdf = new SimpleDateFormat(dateFormat, locale)
val dateStr = sdf.format(date)
val options = Map("dateFormat" -> dateFormat, "locale" -> langTag)

checkEvaluation(
CsvToStructs(schema, options, Literal.create(dateStr), gmtId),
InternalRow(17836)) // number of days from 1970-01-01
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import java.util.Calendar
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale}

import org.scalatest.exceptions.TestFailedException

Expand Down Expand Up @@ -737,4 +738,20 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), Literal.create("true")))),
"struct<col:bigint>")
}

test("parse date with locale") {
Seq("en-US", "ru-RU").foreach { langTag =>
val locale = Locale.forLanguageTag(langTag)
val date = new SimpleDateFormat("yyyy-MM-dd").parse("2018-11-05")
val schema = new StructType().add("d", DateType)
val dateFormat = "MMM yyyy"
val sdf = new SimpleDateFormat(dateFormat, locale)
val dateStr = s"""{"d":"${sdf.format(date)}"}"""
val options = Map("dateFormat" -> dateFormat, "locale" -> langTag)

checkEvaluation(
JsonToStructs(schema, options, Literal.create(dateStr), gmtId),
InternalRow(17836)) // number of days from 1970-01-01
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* for schema inferring.</li>
* <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of all null values or
* empty array/struct during schema inference.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* </ul>
*
* @since 2.0.0
Expand Down Expand Up @@ -604,6 +606,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* that should be used for parsing.</li>
* <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of all null values or
* empty array/struct during schema inference.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* </ul>
*
* @since 2.0.0
Expand Down Expand Up @@ -372,6 +374,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql

import java.text.SimpleDateFormat
import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -117,4 +120,18 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
"Acceptable modes are PERMISSIVE and FAILFAST."))
}
}

test("parse timestamps with locale") {
Seq("en-US", "ko-KR", "zh-CN", "ru-RU").foreach { langTag =>
val locale = Locale.forLanguageTag(langTag)
val ts = new SimpleDateFormat("dd/MM/yyyy HH:mm").parse("06/11/2018 18:00")
val timestampFormat = "dd MMM yyyy HH:mm"
val sdf = new SimpleDateFormat(timestampFormat, locale)
val input = Seq(s"""${sdf.format(ts)}""").toDS()
val options = Map("timestampFormat" -> timestampFormat, "locale" -> langTag)
val df = input.select(from_csv($"value", lit("time timestamp"), options.asJava))

checkAnswer(df, Row(Row(java.sql.Timestamp.valueOf("2018-11-06 18:00:00.0"))))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql

import java.text.SimpleDateFormat
import java.util.Locale

import collection.JavaConverters._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -578,4 +581,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
"Acceptable modes are PERMISSIVE and FAILFAST."))
}
}

test("parse timestamps with locale") {
Seq("en-US", "ko-KR", "zh-CN", "ru-RU").foreach { langTag =>
val locale = Locale.forLanguageTag(langTag)
val ts = new SimpleDateFormat("dd/MM/yyyy HH:mm").parse("06/11/2018 18:00")
val timestampFormat = "dd MMM yyyy HH:mm"
val sdf = new SimpleDateFormat(timestampFormat, locale)
val input = Seq(s"""{"time": "${sdf.format(ts)}"}""").toDS()
val options = Map("timestampFormat" -> timestampFormat, "locale" -> langTag)
val df = input.select(from_json($"value", "time timestamp", options))

checkAnswer(df, Row(Row(java.sql.Timestamp.valueOf("2018-11-06 18:00:00.0"))))
}
}
}