Skip to content

Commit

Permalink
[SPARK-6411] [SQL] [PySpark] support date/datetime with timezone in P…
Browse files Browse the repository at this point in the history
…ython

Spark SQL does not support timezone, and Pyrolite does not support timezone well. This patch will convert datetime into POSIX timestamp (without confusing of timezone), which is used by SQL. If the datetime object does not have timezone, it's treated as local time.

The timezone in RDD will be lost after one round trip, all the datetime from SQL will be local time.

Because of Pyrolite, datetime from SQL only has precision as 1 millisecond.

This PR also drop the timezone in date, convert it to number of days since epoch (used in SQL).

Author: Davies Liu <[email protected]>

Closes apache#6250 from davies/tzone and squashes the following commits:

44d8497 [Davies Liu] add timezone support for DateType
99d9d9c [Davies Liu] use int for timestamp
10aa7ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into tzone
6a29aa4 [Davies Liu] support datetime with timezone
  • Loading branch information
Davies Liu authored and rxin committed Jun 11, 2015
1 parent 6b68366 commit 424b007
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
32 changes: 32 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tempfile
import pickle
import functools
import time
import datetime

import py4j
Expand All @@ -47,6 +48,20 @@
from pyspark.sql.window import Window


class UTC(datetime.tzinfo):
"""UTC"""
ZERO = datetime.timedelta(0)

def utcoffset(self, dt):
return self.ZERO

def tzname(self, dt):
return "UTC"

def dst(self, dt):
return self.ZERO


class ExamplePointUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePoint.
Expand Down Expand Up @@ -588,6 +603,23 @@ def test_filter_with_datetime(self):
self.assertEqual(0, df.filter(df.date > date).count())
self.assertEqual(0, df.filter(df.time > time).count())

def test_time_with_timezone(self):
day = datetime.date.today()
now = datetime.datetime.now()
ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
# class in __main__ is not serializable
from pyspark.sql.tests import UTC
utc = UTC()
utcnow = datetime.datetime.fromtimestamp(ts, utc)
df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
day1, now1, utcnow1 = df.first()
# Pyrolite serialize java.sql.Date as datetime, will be fixed in new version
self.assertEqual(day1.date(), day)
# Pyrolite does not support microsecond, the error should be
# less than 1 millisecond
self.assertTrue(now - now1 < datetime.timedelta(0.001))
self.assertTrue(now - utcnow1 < datetime.timedelta(0.001))

def test_dropna(self):
schema = StructType([
StructField("name", StringType(), True),
Expand Down
27 changes: 18 additions & 9 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
elif isinstance(dataType, TimestampType):
elif isinstance(dataType, (DateType, TimestampType)):
return True
else:
return False


EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()


def _python_to_sql_converter(dataType):
"""
Returns a converter that converts a Python object into a SQL datum for the given type.
Expand Down Expand Up @@ -698,26 +701,32 @@ def converter(obj):
return tuple(c(d.get(n)) for n, c in zip(names, converters))
else:
return tuple(c(v) for c, v in zip(converters, obj))
else:
elif obj is not None:
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
return converter
elif isinstance(dataType, ArrayType):
element_converter = _python_to_sql_converter(dataType.elementType)
return lambda a: [element_converter(v) for v in a]
return lambda a: a and [element_converter(v) for v in a]
elif isinstance(dataType, MapType):
key_converter = _python_to_sql_converter(dataType.keyType)
value_converter = _python_to_sql_converter(dataType.valueType)
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])

elif isinstance(dataType, UserDefinedType):
return lambda obj: dataType.serialize(obj)
return lambda obj: obj and dataType.serialize(obj)

elif isinstance(dataType, DateType):
return lambda d: d and d.toordinal() - EPOCH_ORDINAL

elif isinstance(dataType, TimestampType):

def to_posix_timstamp(dt):
if dt.tzinfo is None:
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
else:
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
if dt:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds * 1e7 + dt.microsecond * 10)
return to_posix_timstamp

else:
raise ValueError("Unexpected type %r" % dataType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Row, _}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down

0 comments on commit 424b007

Please sign in to comment.