Skip to content

Commit

Permalink
[SPARK-5323][SQL] Remove Row's Seq inheritance.
Browse files Browse the repository at this point in the history
Author: Reynold Xin <[email protected]>

Closes apache#4115 from rxin/row-seq and squashes the following commits:

e33abd8 [Reynold Xin] Fixed compilation error.
cceb650 [Reynold Xin] Python test fixes, and removal of WrapDynamic.
0334a52 [Reynold Xin] mkString.
9cdeb7d [Reynold Xin] Hive tests.
15681c2 [Reynold Xin] Fix more test cases.
ea9023a [Reynold Xin] Fixed a catalyst test.
c5e2cb5 [Reynold Xin] Minor patch up.
b9cab7c [Reynold Xin] [SPARK-5323][SQL] Remove Row's Seq inheritance.
  • Loading branch information
rxin authored and bomeng committed Jan 22, 2015
1 parent 47d8337 commit 125e4a1
Show file tree
Hide file tree
Showing 47 changed files with 1,018 additions and 956 deletions.
75 changes: 71 additions & 4 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.catalyst.expressions.GenericRow


Expand All @@ -32,7 +34,7 @@ object Row {
* }
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row.toSeq)

/**
* This method can be used to construct a [[Row]] with the given values.
Expand All @@ -43,6 +45,16 @@ object Row {
* This method can be used to construct a [[Row]] from a [[Seq]] of values.
*/
def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray)

def fromTuple(tuple: Product): Row = fromSeq(tuple.productIterator.toSeq)

/**
* Merge multiple rows into a single row, one after another.
*/
def merge(rows: Row*): Row = {
// TODO: Improve the performance of this if used in performance critical part.
new GenericRow(rows.flatMap(_.toSeq).toArray)
}
}


Expand Down Expand Up @@ -103,7 +115,13 @@ object Row {
*
* @group row
*/
trait Row extends Seq[Any] with Serializable {
trait Row extends Serializable {
/** Number of elements in the Row. */
def size: Int = length

/** Number of elements in the Row. */
def length: Int

/**
* Returns the value at position i. If the value is null, null is returned. The following
* is a mapping between Spark SQL types and return types:
Expand Down Expand Up @@ -291,12 +309,61 @@ trait Row extends Seq[Any] with Serializable {

/** Returns true if there are any NULL values in this row. */
def anyNull: Boolean = {
val l = length
val len = length
var i = 0
while (i < l) {
while (i < len) {
if (isNullAt(i)) { return true }
i += 1
}
false
}

override def equals(that: Any): Boolean = that match {
case null => false
case that: Row =>
if (this.length != that.length) {
return false
}
var i = 0
val len = this.length
while (i < len) {
if (apply(i) != that.apply(i)) {
return false
}
i += 1
}
true
case _ => false
}

override def hashCode: Int = {
// Using Scala's Seq hash code implementation.
var n = 0
var h = MurmurHash3.seqSeed
val len = length
while (n < len) {
h = MurmurHash3.mix(h, apply(n).##)
n += 1
}
MurmurHash3.finalizeHash(h, n)
}

/* ---------------------- utility methods for Scala ---------------------- */

/**
* Return a Scala Seq representing the row. ELements are placed in the same order in the Seq.
*/
def toSeq: Seq[Any]

/** Displays all elements of this sequence in a string (without a separator). */
def mkString: String = toSeq.mkString

/** Displays all elements of this sequence in a string using a separator string. */
def mkString(sep: String): String = toSeq.mkString(sep)

/**
* Displays all elements of this traversable or iterator in a string using
* start, end, and separator strings.
*/
def mkString(start: String, sep: String, end: String): String = toSeq.mkString(start, sep, end)
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ trait ScalaReflection {
}

def convertRowToScala(r: Row, schema: StructType): Row = {
// TODO: This is very slow!!!
new GenericRow(
r.zip(schema.fields.map(_.dataType))
r.toSeq.zip(schema.fields.map(_.dataType))
.map(r_dt => convertToScala(r_dt._1, r_dt._2)).toArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,6 @@ package object dsl {
def sfilter[T1](arg1: Symbol)(udf: (T1) => Boolean) =
Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan)

def sfilter(dynamicUdf: (DynamicRow) => Boolean) =
Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(logicalPlan.output))), logicalPlan)

def sample(
fraction: Double,
withReplacement: Boolean = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
val casts = from.fields.zip(to.fields).map {
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
}
buildCast[Row](_, row => Row(row.zip(casts).map {
// TODO: This is very slow!
buildCast[Row](_, row => Row(row.toSeq.zip(casts).map {
case (v, cast) => if (v == null) null else cast(v)
}: _*))
}
Expand Down
Loading

0 comments on commit 125e4a1

Please sign in to comment.