Skip to content

Commit

Permalink
Added support for DateType and TimestampType primitive spark types (#135
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ajayborra authored and tovbinm committed Sep 29, 2018
1 parent aca9100 commit 978ece4
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,14 @@ case object FeatureSparkTypes {
def featureTypeTagOf(sparkType: DataType, isNullable: Boolean): WeakTypeTag[_ <: FeatureType] = sparkType match {
case DoubleType if !isNullable => weakTypeTag[types.RealNN]
case DoubleType => weakTypeTag[types.Real]
case FloatType if !isNullable => weakTypeTag[types.RealNN]
case FloatType => weakTypeTag[types.Real]
case ByteType => weakTypeTag[types.Integral]
case ShortType => weakTypeTag[types.Integral]
case IntegerType => weakTypeTag[types.Integral]
case LongType => weakTypeTag[types.Integral]
case DateType => weakTypeTag[types.Date]
case TimestampType => weakTypeTag[types.DateTime]
case ArrayType(StringType, _) => weakTypeTag[types.TextList]
case StringType => weakTypeTag[types.Text]
case BooleanType => weakTypeTag[types.Binary]
Expand All @@ -213,6 +220,8 @@ case object FeatureSparkTypes {
case MapType(StringType, ArrayType(StringType, _), _) => weakTypeTag[types.MultiPickListMap]
case MapType(StringType, ArrayType(DoubleType, _), _) => weakTypeTag[types.GeolocationMap]
case VectorType => weakTypeTag[types.OPVector]
case BinaryType =>
throw new IllegalArgumentException("Spark BinaryType is currently not supported.")
case _ => throw new IllegalArgumentException(s"No feature type tag mapping for Spark type $sparkType")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,41 @@ case object FeatureTypeSparkConverter {
case wt if wt <:< weakTypeOf[t.Text] => (value: Any) =>
if (value == null) FeatureTypeDefaults.Text.value else Some(value.asInstanceOf[String])

// Numerics
// Date & Time
case wt if wt <:< weakTypeOf[t.DateTime] => (value: Any) =>
if (value == null) FeatureTypeDefaults.DateTime.value else Some(value.asInstanceOf[Long])
case wt if wt <:< weakTypeOf[t.Date] => (value: Any) =>
value match {
case null => FeatureTypeDefaults.Date.value
case v: Int => Some(v.toLong)
case v: Long => Some(v)
case _ => throw new IllegalArgumentException(s"Date type mapping is not defined for ${value.getClass}")
}

// Numerals
case wt if wt <:< weakTypeOf[t.RealNN] => (value: Any) =>
if (value == null) None else Some(value.asInstanceOf[Double])
value match {
case null => None
case v: Float => Some(v.toDouble)
case v: Double => Some(v)
case _ => throw new IllegalArgumentException(s"RealNN type mapping is not defined for ${value.getClass}")
}
case wt if wt <:< weakTypeOf[t.Real] => (value: Any) =>
if (value == null) FeatureTypeDefaults.Real.value else Some(value.asInstanceOf[Double])
value match {
case null => FeatureTypeDefaults.Real.value
case v: Float => Some(v.toDouble)
case v: Double => Some(v)
case _ => throw new IllegalArgumentException(s"Real type mapping is not defined for ${value.getClass}")
}
case wt if wt <:< weakTypeOf[t.Integral] => (value: Any) =>
if (value == null) FeatureTypeDefaults.Integral.value else Some(value.asInstanceOf[Long])
value match {
case null => FeatureTypeDefaults.Integral.value
case v: Byte => Some(v.toLong)
case v: Short => Some(v.toLong)
case v: Int => Some(v.toLong)
case v: Long => Some(v)
case _ => throw new IllegalArgumentException(s"Integral type mapping is not defined for ${value.getClass}")
}
case wt if wt <:< weakTypeOf[t.Binary] => (value: Any) =>
if (value == null) FeatureTypeDefaults.Binary.value else Some(value.asInstanceOf[Boolean])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.features

import com.salesforce.op.test.TestSparkContext
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.sql.types._
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

import scala.reflect.runtime.universe._

@RunWith(classOf[JUnitRunner])
class FeatureSparkTypeTest extends FlatSpec with TestSparkContext {
val sparkTypeToTypeTagMappings = Seq(
(DoubleType, weakTypeTag[types.RealNN]), (FloatType, weakTypeTag[types.RealNN]),
(LongType, weakTypeTag[types.Integral]), (IntegerType, weakTypeTag[types.Integral]),
(ShortType, weakTypeTag[types.Integral]), (ByteType, weakTypeTag[types.Integral]),
(DateType, weakTypeTag[types.Date]), (TimestampType, weakTypeTag[types.DateTime]),
(StringType, weakTypeTag[types.Text]), (BooleanType, weakTypeTag[types.Binary]),
(VectorType, weakTypeTag[types.OPVector])
)

val sparkCollectionTypeToTypeTagMappings = Seq(
(ArrayType(LongType, containsNull = true), weakTypeTag[types.DateList]),
(ArrayType(DoubleType, containsNull = true), weakTypeTag[types.Geolocation]),
(MapType(StringType, StringType, valueContainsNull = true), weakTypeTag[types.TextMap]),
(MapType(StringType, DoubleType, valueContainsNull = true), weakTypeTag[types.RealMap]),
(MapType(StringType, LongType, valueContainsNull = true), weakTypeTag[types.IntegralMap]),
(MapType(StringType, BooleanType, valueContainsNull = true), weakTypeTag[types.BinaryMap]),
(MapType(StringType, ArrayType(StringType, containsNull = true), valueContainsNull = true),
weakTypeTag[types.MultiPickListMap]),
(MapType(StringType, ArrayType(DoubleType, containsNull = true), valueContainsNull = true),
weakTypeTag[types.GeolocationMap])
)

val sparkNonNullableTypeToTypeTagMappings = Seq(
(DoubleType, weakTypeTag[types.Real]), (FloatType, weakTypeTag[types.Real])
)

Spec(FeatureSparkTypes.getClass) should "assign appropriate feature type tags for valid types" in {
sparkTypeToTypeTagMappings.foreach(mapping =>
FeatureSparkTypes.featureTypeTagOf(mapping._1, isNullable = false) shouldBe mapping._2
)
}

it should "assign appropriate feature type tags for valid non-nullable types" in {
sparkNonNullableTypeToTypeTagMappings.foreach(mapping =>
FeatureSparkTypes.featureTypeTagOf(mapping._1, isNullable = true) shouldBe mapping._2
)
}

it should "assign appropriate feature type tags for collection types" in {
sparkCollectionTypeToTypeTagMappings.foreach(mapping =>
FeatureSparkTypes.featureTypeTagOf(mapping._1, isNullable = true) shouldBe mapping._2
)
}

it should "throw error for unsupported types" in {
val error = intercept[IllegalArgumentException](FeatureSparkTypes.featureTypeTagOf(BinaryType, isNullable = false))
error.getMessage shouldBe "Spark BinaryType is currently not supported."
}

it should "throw error for unknown types" in {
val unknownType = NullType
val error = intercept[IllegalArgumentException](FeatureSparkTypes.featureTypeTagOf(unknownType, isNullable = false))
error.getMessage shouldBe s"No feature type tag mapping for Spark type $unknownType"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class FeatureTypeSparkConverterTest
)
val bogusNames = Gen.alphaNumStr

val naturalNumbers = Table("NaturalNumbers", Byte.MaxValue, Short.MaxValue, Int.MaxValue, Long.MaxValue)

val realNumbers = Table("NaturalNumbers", Float.MaxValue, Double.MaxValue)

val dateTimeValues = Table("DateTimeVals", 300, 300L)

property("is a feature type converter") {
forAll(featureTypeConverters) { ft => ft shouldBe a[FeatureTypeSparkConverter[_]] }
}
Expand Down Expand Up @@ -94,4 +100,100 @@ class FeatureTypeSparkConverterTest
}
)
}

property("converts Natural Number of Byte/Short/Int/Long ranges to Integral valued feature type") {
forAll(naturalNumbers)(nn =>
FeatureTypeSparkConverter[Integral]().fromSpark(nn) shouldBe a[Integral]
)
}
property("converts Natural Number of Byte/Short/Int/Long ranges to Long range Integral feature") {
forAll(naturalNumbers)(nn =>
FeatureTypeSparkConverter[Integral]().fromSpark(nn).value.get shouldBe a[java.lang.Long]
)
}
property("raises error for bad Natural Number") {
forAll(realNumbers)(nn =>
intercept[IllegalArgumentException](FeatureTypeSparkConverter[Integral]().fromSpark(nn)).getMessage
shouldBe s"Integral type mapping is not defined for class java.lang.${nn.getClass.toString.capitalize}"
)
}

property("converts Real Numbers in float/double ranges to Real valued feature type") {
forAll(realNumbers)(rn =>
FeatureTypeSparkConverter[Real]().fromSpark(rn) shouldBe a[Real]
)
}
property("converts Real Numbers in float/double ranges to Double range Real feature") {
forAll(realNumbers)(rn =>
FeatureTypeSparkConverter[Real]().fromSpark(rn).value.get shouldBe a[java.lang.Double]
)
}
property("raises error for bad Real Number") {
forAll(naturalNumbers)(rn =>
intercept[IllegalArgumentException](FeatureTypeSparkConverter[Real]().fromSpark(rn))
.getMessage shouldBe s"Real type mapping is not defined for class java.lang.${rn.getClass.toString.capitalize}"
)
}

property("converts Real Numbers in float/double ranges to RealNN valued feature type") {
forAll(realNumbers)(rn =>
FeatureTypeSparkConverter[RealNN]().fromSpark(rn) shouldBe a[RealNN]
)
}
property("converts Real Numbers in float/double ranges Double range RealNN feature") {
forAll(realNumbers)(rn =>
FeatureTypeSparkConverter[RealNN]().fromSpark(rn).value.get shouldBe a[java.lang.Double]
)
}
property("raises error for empty RealNN Number") {
forAll(naturalNumbers)(rn =>
intercept[NonNullableEmptyException](FeatureTypeSparkConverter[RealNN]().fromSpark(null))
.getMessage shouldBe "RealNN cannot be empty"
)
}

property("converts date denoted using int/long ranges to date feature types") {
forAll(dateTimeValues)(dt =>
FeatureTypeSparkConverter[Date]().fromSpark(dt) shouldBe a[Date]
)
}
property("converts date denoted using int/long ranges to Long range date feature") {
forAll(dateTimeValues)(dt =>
FeatureTypeSparkConverter[Date]().fromSpark(dt).value.get shouldBe a[java.lang.Long]
)
}
property("raises error for bad date values") {
forAll(realNumbers)(rn =>
intercept[IllegalArgumentException](FeatureTypeSparkConverter[Date]().fromSpark(rn))
.getMessage shouldBe s"Date type mapping is not defined for class java.lang.${rn.getClass.toString.capitalize}"
)
}

property("converts timestamp denoted using long range to datetime feature type") {
forAll(dateTimeValues)(dt =>
FeatureTypeSparkConverter[DateTime]().fromSpark(dt) shouldBe a[DateTime]
)
}
property("converts timestamp denoted using long range to date super feature type") {
forAll(dateTimeValues)(dt =>
FeatureTypeSparkConverter[DateTime]().fromSpark(dt) shouldBe a[Date]
)
}
property("converts timestamp denoted using long ranges to long range datetime feature") {
forAll(dateTimeValues)(dt =>
FeatureTypeSparkConverter[DateTime]().fromSpark(dt).value.get shouldBe a[java.lang.Long]
)
}

property("converts string to text feature type") {
val text = FeatureTypeSparkConverter[Text]().fromSpark("Simple")
text shouldBe a[Text]
text.value.get shouldBe a[String]
}

property("converts a Boolean to Binary feature type") {
val bool = FeatureTypeSparkConverter[Binary]().fromSpark(false)
bool shouldBe a[Binary]
bool.value.get shouldBe a[java.lang.Boolean]
}
}

0 comments on commit 978ece4

Please sign in to comment.