Skip to content

Commit

Permalink
Reuses Row object in ExistingRdd.productToRowRdd()
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Apr 17, 2014
1 parent bb76eae commit 52acec9
Showing 1 changed file with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair


case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output = projectList.map(_.toAttribute)

Expand Down Expand Up @@ -143,8 +142,30 @@ object ExistingRdd {
}

def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
// TODO: Reuse the row, don't use map on the product iterator. Maybe code gen?
data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val first = iterator.next()
val mutableRow = new GenericMutableRow(first.productArity)

var i = 0
while (i < mutableRow.length) {
mutableRow(i) = first.productElement(i)
i += 1
}

Iterator.single(mutableRow) ++ iterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = r.productElement(i)
i += 1
}

mutableRow
}
}
}
}

def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
Expand Down

0 comments on commit 52acec9

Please sign in to comment.