Skip to content

Commit

Permalink
fix Date serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Apr 9, 2015
1 parent 59266d1 commit bac3a6b
Show file tree
Hide file tree
Showing 15 changed files with 30 additions and 26 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.sql.{Date, Time}

import scala.collection.JavaConversions._

import org.apache.spark.util.DateUtils

/**
* Utility functions to serialize, deserialize objects to / from R
*/
Expand Down Expand Up @@ -101,7 +103,7 @@ private[spark] object SerDe {

def readDate(in: DataInputStream): Date = {
val d = in.readInt()
new Date(d.toLong * 24 * 3600 * 1000)
DateUtils.toJavaDate(d)
}

def readTime(in: DataInputStream): Time = {
Expand Down Expand Up @@ -277,7 +279,8 @@ private[spark] object SerDe {
}

def writeDate(out: DataOutputStream, value: Date): Unit = {
out.writeInt((value.getTime / 1000 / 3600 / 24).toInt)
val d = DateUtils.fromJavaDate(value)
out.writeInt(d)
}

def writeTime(out: DataOutputStream, value: Time): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.types
package org.apache.spark.util

import java.sql.Date
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
*/
Expand Down Expand Up @@ -55,6 +53,4 @@ object DateUtils {
def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
}

def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@

package org.apache.spark.repl

import org.apache.spark.annotation.DeveloperApi

import scala.tools.nsc._
import scala.collection.mutable.ListBuffer
import scala.tools.nsc.interpreter.Completion._
import scala.tools.nsc.interpreter._

import scala.tools.jline._
import scala.tools.jline.console.completer._
import Completion._
import scala.collection.mutable.ListBuffer
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi

/**
* Represents an auto-completion tool for the supplied interpreter that
Expand All @@ -33,10 +29,10 @@ class SparkJLineCompletion(val intp: SparkIMain) extends Completion with Complet
val global: intp.global.type = intp.global

import global._
import definitions.{ PredefModule, AnyClass, AnyRefClass, ScalaPackage, JavaLangPackage }
import rootMirror.{ RootClass, getModuleIfDefined }
import definitions.{AnyClass, AnyRefClass, JavaLangPackage, PredefModule, ScalaPackage}
import rootMirror.{RootClass, getModuleIfDefined}
type ExecResult = Any
import intp.{ debugging }
import intp.debugging

/**
* Represents the level of verbosity. Increments with consecutive tabs.
Expand Down
2 changes: 1 addition & 1 deletion sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.{StructType, DateUtils}
import org.apache.spark.sql.types.StructType

object Row {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst

import java.sql.Timestamp

import org.apache.spark.util.Utils
import org.apache.spark.util.{Utils, DateUtils}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.types._
import org.apache.spark.util.DateUtils

/** Cast the child expression to the target data type. */
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging {
Expand Down Expand Up @@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
case DateType => buildCast[Int](_, d => DateUtils.toString(d))
case DateType => buildCast[Int](_, d =>
Cast.threadLocalDateFormat.get.format(DateUtils.toJavaDate(d)))
case TimestampType => buildCast[Timestamp](_, timestampToString)
case _ => buildCast[Any](_, _.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
""".children

case Cast(child @ DateType(), StringType) =>
child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)
child.castOrNull(c =>
q"""org.apache.spark.sql.catalyst.expressions.Cast.threadLocalDateFormat.get.format(
org.apache.spark.util.DateUtils.toJavaDate($c.toInt))
""",
StringType)

case Cast(child @ NumericType(), IntegerType) =>
child.castOrNull(c => q"$c.toInt", IntegerType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.types._
import org.apache.spark.util.DateUtils

object Literal {
def apply(v: Any): Literal = v match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.Matchers._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.analysis.UnresolvedGetField
import org.apache.spark.sql.types._

import org.apache.spark.util.DateUtils

class ExpressionEvaluationBaseSuite extends FunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._
import org.apache.spark.{Accumulator, Logging => SparkLogging}
import org.apache.spark.util.DateUtils

/**
* A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types._
import org.apache.spark.Logging
import org.apache.spark.util.DateUtils

private[sql] object JsonRDD extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.sql.{Date, Timestamp}
import org.scalactic.Tolerance._

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.json.JsonRDD.{compatibleType, enforceCorrectType}
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.util.Utils
import org.apache.spark.util.{DateUtils, Utils}

class JsonSuite extends QueryTest {
import org.apache.spark.sql.json.TestJsonData._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.DateUtils

// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.spark.util.DateUtils

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DateUtils
import org.apache.spark.util.DateUtils

/**
* A trait for subclasses that handle table scans.
Expand Down

0 comments on commit bac3a6b

Please sign in to comment.