From c619f0a9a5cfdbb3586c4e9f8df71ae47fc22051 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Wed, 3 Sep 2014 13:16:05 +0800 Subject: [PATCH 1/4] Type Coercion should support every type to have null value --- .../catalyst/analysis/HiveTypeCoercion.scala | 23 +++++++++-------- .../analysis/HiveTypeCoercionSuite.scala | 25 ++++++++++++++++--- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 15eb5982a4a91..ee0975ddadea9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -26,10 +26,8 @@ object HiveTypeCoercion { // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types. // The conversion for integral and floating point types have a linear widening hierarchy: val numericPrecedence = - Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) - // Boolean is only wider than Void - val booleanPrecedence = Seq(NullType, BooleanType) - val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil + Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) + val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: Nil } /** @@ -55,12 +53,17 @@ trait HiveTypeCoercion { trait TypeWidening { def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = { - // Try and find a promotion rule that contains both types in question. - val applicableConversion = - HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2)) - - // If found return the widest common type, otherwise None - applicableConversion.map(_.filter(t => t == t1 || t == t2).last) + val valueTypes = Seq(t1, t2).filter(t => t != NullType) + if (valueTypes.distinct.size > 1) { + // Try and find a promotion rule that contains both types in question. + val applicableConversion = + HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2)) + + // If found return the widest common type, otherwise None + applicableConversion.map(_.filter(t => t == t1 || t == t2).last) + } else { + Some(if (valueTypes.size == 0) NullType else valueTypes.head) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala index b9e0f8e9dcc5f..aed457e1accee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -26,7 +26,7 @@ class HiveTypeCoercionSuite extends FunSuite { val rules = new HiveTypeCoercion { } import rules._ - test("tightest common bound for numeric and boolean types") { + test("tightest common bound for types") { def widenTest(t1: DataType, t2: DataType, tightestCommon: Option[DataType]) { var found = WidenTypes.findTightestCommonType(t1, t2) assert(found == tightestCommon, @@ -37,6 +37,9 @@ class HiveTypeCoercionSuite extends FunSuite { s"Expected $tightestCommon as tightest common type for $t2 and $t1, found $found") } + // Null + widenTest(NullType, NullType, Some(NullType)) + // Boolean widenTest(NullType, BooleanType, Some(BooleanType)) widenTest(BooleanType, BooleanType, Some(BooleanType)) @@ -60,12 +63,28 @@ class HiveTypeCoercionSuite extends FunSuite { widenTest(DoubleType, DoubleType, Some(DoubleType)) // Integral mixed with floating point. - widenTest(NullType, FloatType, Some(FloatType)) - widenTest(NullType, DoubleType, Some(DoubleType)) widenTest(IntegerType, FloatType, Some(FloatType)) widenTest(IntegerType, DoubleType, Some(DoubleType)) widenTest(IntegerType, DoubleType, Some(DoubleType)) widenTest(LongType, FloatType, Some(FloatType)) widenTest(LongType, DoubleType, Some(DoubleType)) + + // StringType + widenTest(NullType, StringType, Some(StringType)) + widenTest(StringType, StringType, Some(StringType)) + widenTest(IntegerType, StringType, None) + widenTest(LongType, StringType, None) + + // TimestampType + widenTest(NullType, TimestampType, Some(TimestampType)) + widenTest(TimestampType, TimestampType, Some(TimestampType)) + widenTest(IntegerType, TimestampType, None) + widenTest(StringType, TimestampType, None) + + // ComplexType + widenTest(NullType, MapType(IntegerType, StringType, false), Some(MapType(IntegerType, StringType, false))) + widenTest(NullType, StructType(Seq()), Some(StructType(Seq()))) + widenTest(StringType, MapType(IntegerType, StringType, true), None) + widenTest(ArrayType(IntegerType), StructType(Seq()), None) } } From ef6f986093b90d19bc98772a752b4809686c4347 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 4 Sep 2014 14:07:49 +0800 Subject: [PATCH 2/4] make double boolean miss in jsonRDD compatibleType --- .../src/main/scala/org/apache/spark/sql/json/JsonRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 1c0b03c684f10..4444054f8d21d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -135,7 +135,7 @@ private[sql] object JsonRDD extends Logging { if (returnType.isDefined) { returnType.get } else { - // t1 or t2 is a StructType, ArrayType, or an unexpected type. + // t1 or t2 is a StructType, ArrayType, BooleanType, or an unexpected type. (t1, t2) match { case (other: DataType, NullType) => other case (NullType, other: DataType) => other @@ -155,6 +155,7 @@ private[sql] object JsonRDD extends Logging { ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2) // TODO: We should use JsonObjectStringType to mark that values of field will be // strings and every string is a Json object. + case (BooleanType, BooleanType) => BooleanType case (_, _) => StringType } } From 832e640b47728e036a3f6789b5bb92a319a7aed6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 4 Sep 2014 18:40:01 -0700 Subject: [PATCH 3/4] reduce code duplication --- .../catalyst/analysis/HiveTypeCoercion.scala | 37 ++++++------- .../analysis/HiveTypeCoercionSuite.scala | 7 +-- .../org/apache/spark/sql/json/JsonRDD.scala | 52 ++++++++----------- 3 files changed, 43 insertions(+), 53 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index ee0975ddadea9..fdfa8d55b72be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -28,6 +28,20 @@ object HiveTypeCoercion { val numericPrecedence = Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: Nil + + def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = { + val valueTypes = Seq(t1, t2).filter(t => t != NullType) + if (valueTypes.distinct.size > 1) { + // Try and find a promotion rule that contains both types in question. + val applicableConversion = + HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2)) + + // If found return the widest common type, otherwise None + applicableConversion.map(_.filter(t => t == t1 || t == t2).last) + } else { + Some(if (valueTypes.size == 0) NullType else valueTypes.head) + } + } } /** @@ -51,22 +65,6 @@ trait HiveTypeCoercion { Division :: Nil - trait TypeWidening { - def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = { - val valueTypes = Seq(t1, t2).filter(t => t != NullType) - if (valueTypes.distinct.size > 1) { - // Try and find a promotion rule that contains both types in question. - val applicableConversion = - HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2)) - - // If found return the widest common type, otherwise None - applicableConversion.map(_.filter(t => t == t1 || t == t2).last) - } else { - Some(if (valueTypes.size == 0) NullType else valueTypes.head) - } - } - } - /** * Applies any changes to [[AttributeReference]] data types that are made by other rules to * instances higher in the query tree. @@ -147,7 +145,8 @@ trait HiveTypeCoercion { * - LongType to FloatType * - LongType to DoubleType */ - object WidenTypes extends Rule[LogicalPlan] with TypeWidening { + object WidenTypes extends Rule[LogicalPlan] { + import HiveTypeCoercion._ def apply(plan: LogicalPlan): LogicalPlan = plan transform { case u @ Union(left, right) if u.childrenResolved && !u.resolved => @@ -343,7 +342,9 @@ trait HiveTypeCoercion { /** * Coerces the type of different branches of a CASE WHEN statement to a common type. */ - object CaseWhenCoercion extends Rule[LogicalPlan] with TypeWidening { + object CaseWhenCoercion extends Rule[LogicalPlan] { + import HiveTypeCoercion._ + def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case cw @ CaseWhen(branches) if !cw.resolved && !branches.exists(!_.resolved) => val valueTypes = branches.sliding(2, 2).map { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala index aed457e1accee..ba8b853b6f99e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -23,16 +23,13 @@ import org.apache.spark.sql.catalyst.types._ class HiveTypeCoercionSuite extends FunSuite { - val rules = new HiveTypeCoercion { } - import rules._ - test("tightest common bound for types") { def widenTest(t1: DataType, t2: DataType, tightestCommon: Option[DataType]) { - var found = WidenTypes.findTightestCommonType(t1, t2) + var found = HiveTypeCoercion.findTightestCommonType(t1, t2) assert(found == tightestCommon, s"Expected $tightestCommon as tightest common type for $t1 and $t2, found $found") // Test both directions to make sure the widening is symmetric. - found = WidenTypes.findTightestCommonType(t2, t1) + found = HiveTypeCoercion.findTightestCommonType(t2, t1) assert(found == tightestCommon, s"Expected $tightestCommon as tightest common type for $t2 and $t1, found $found") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 4444054f8d21d..bbe6335f85156 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -125,39 +125,31 @@ private[sql] object JsonRDD extends Logging { * Returns the most general data type for two given data types. */ private[json] def compatibleType(t1: DataType, t2: DataType): DataType = { - // Try and find a promotion rule that contains both types in question. - val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p - .contains(t2)) - - // If found return the widest common type, otherwise None - val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last) - - if (returnType.isDefined) { - returnType.get - } else { - // t1 or t2 is a StructType, ArrayType, BooleanType, or an unexpected type. - (t1, t2) match { - case (other: DataType, NullType) => other - case (NullType, other: DataType) => other - case (StructType(fields1), StructType(fields2)) => { - val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { - case (name, fieldTypes) => { - val dataType = fieldTypes.map(field => field.dataType).reduce( - (type1: DataType, type2: DataType) => compatibleType(type1, type2)) - StructField(name, dataType, true) + HiveTypeCoercion.findTightestCommonType(t1,t2) match { + case Some(commonType) => commonType + case None => + // t1 or t2 is a StructType, ArrayType, BooleanType, or an unexpected type. + (t1, t2) match { + case (other: DataType, NullType) => other + case (NullType, other: DataType) => other + case (StructType(fields1), StructType(fields2)) => { + val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { + case (name, fieldTypes) => { + val dataType = fieldTypes.map(field => field.dataType).reduce( + (type1: DataType, type2: DataType) => compatibleType(type1, type2)) + StructField(name, dataType, true) + } } + StructType(newFields.toSeq.sortBy { + case StructField(name, _, _) => name + }) } - StructType(newFields.toSeq.sortBy { - case StructField(name, _, _) => name - }) + case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => + ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2) + // TODO: We should use JsonObjectStringType to mark that values of field will be + // strings and every string is a Json object. + case (_, _) => StringType } - case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => - ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2) - // TODO: We should use JsonObjectStringType to mark that values of field will be - // strings and every string is a Json object. - case (BooleanType, BooleanType) => BooleanType - case (_, _) => StringType - } } } From c6241de74b75a750be89ba4b1635e388781d8bcf Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Fri, 5 Sep 2014 15:27:12 +0800 Subject: [PATCH 4/4] minor code clean --- .../src/main/scala/org/apache/spark/sql/json/JsonRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index bbe6335f85156..70062eae3b7ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -125,10 +125,10 @@ private[sql] object JsonRDD extends Logging { * Returns the most general data type for two given data types. */ private[json] def compatibleType(t1: DataType, t2: DataType): DataType = { - HiveTypeCoercion.findTightestCommonType(t1,t2) match { + HiveTypeCoercion.findTightestCommonType(t1, t2) match { case Some(commonType) => commonType case None => - // t1 or t2 is a StructType, ArrayType, BooleanType, or an unexpected type. + // t1 or t2 is a StructType, ArrayType, or an unexpected type. (t1, t2) match { case (other: DataType, NullType) => other case (NullType, other: DataType) => other