Skip to content

Commit

Permalink
Fix SPARK-7858 by using output types for conversion.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 26, 2015
1 parent 5a00e66 commit 6cd7366
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
SparkPlan.currentContext.set(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,24 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow, SpecificMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLContext}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], schema: StructType): RDD[Row] = {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
val schemaFields = schema.fields
assert(mutableRow.length == schemaFields.length,
s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}")
val converters = schemaFields.map {
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
}
val mutableRow = new SpecificMutableRow(outputTypes)
assert(mutableRow.length == outputTypes.length,
s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}")
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
Expand All @@ -60,19 +57,16 @@ object RDDConversions {
/**
* Convert the objects inside Row into the types Catalyst expected.
*/
def rowToRowRdd(data: RDD[Row], schema: StructType): RDD[Row] = {
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new GenericMutableRow(bufferedIterator.head.toSeq.toArray)
val schemaFields = schema.fields
assert(mutableRow.length == schemaFields.length,
s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}")
val converters = schemaFields.map {
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
}
val mutableRow = new SpecificMutableRow(outputTypes)
assert(mutableRow.length == outputTypes.length,
s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}")
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
output: Seq[Attribute],
rdd: RDD[Row]): SparkPlan = {
val converted = if (relation.needConversion) {
execution.RDDConversions.rowToRowRdd(rdd, relation.schema)
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
} else {
rdd
}
Expand Down

0 comments on commit 6cd7366

Please sign in to comment.