Skip to content

Commit

Permalink
[SPARK-7858] [SQL] Use output schema, not relation schema, for data s…
Browse files Browse the repository at this point in the history
…ource input conversion

In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the target schema for converting incoming rows into Catalyst rows.  However, we should be using the output schema instead, since our scan might return a subset of the relation's columns.

This patch incorporates apache#6414 by liancheng, which fixes an issue in `SimpleTestRelation` that prevented this bug from being caught by our old tests:

> In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating that values produced by this testing relation should be of Scala types, and need to be converted to Catalyst types when necessary. However, we also used `Cast` to convert strings to expected data types. And `Cast` always produces values of Catalyst types, thus no conversion is done at all. This PR makes `SimpleTextRelation` produce Scala values so that data conversion code paths can be properly tested.

Closes apache#5986.

Author: Josh Rosen <[email protected]>
Author: Cheng Lian <[email protected]>
Author: Cheng Lian <[email protected]>

Closes apache#6400 from JoshRosen/SPARK-7858 and squashes the following commits:

e71c866 [Josh Rosen] Re-fix bug so that the tests pass again
56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites
2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator
6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion.
5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858
8ba195c [Cheng Lian] Merge 9968fba into 6166473
9968fba [Cheng Lian] Tests the data type conversion code paths
  • Loading branch information
JoshRosen authored and nemccarthy committed Jun 19, 2015
1 parent a03081b commit d2f92f5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
SparkPlan.currentContext.set(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,63 +21,49 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow, SpecificMutableRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SQLContext}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], schema: StructType): RDD[Row] = {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
val schemaFields = schema.fields.toArray
val converters = schemaFields.map {
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
}
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = converters(i)(r.productElement(i))
i += 1
}

mutableRow
val numColumns = outputTypes.length
val mutableRow = new GenericMutableRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
while (i < numColumns) {
mutableRow(i) = converters(i)(r.productElement(i))
i += 1
}

mutableRow
}
}
}

/**
* Convert the objects inside Row into the types Catalyst expected.
*/
def rowToRowRdd(data: RDD[Row], schema: StructType): RDD[Row] = {
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new GenericMutableRow(bufferedIterator.head.toSeq.toArray)
val schemaFields = schema.fields.toArray
val converters = schemaFields.map {
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
}
bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = converters(i)(r(i))
i += 1
}

mutableRow
val numColumns = outputTypes.length
val mutableRow = new GenericMutableRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
while (i < numColumns) {
mutableRow(i) = converters(i)(r(i))
i += 1
}

mutableRow
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
output: Seq[Attribute],
rdd: RDD[Row]): SparkPlan = {
val converted = if (relation.needConversion) {
execution.RDDConversions.rowToRowRdd(rdd, relation.schema)
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
} else {
rdd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLContext}
Expand Down Expand Up @@ -108,7 +109,10 @@ class SimpleTextRelation(

sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
Row(record.split(",").zip(fields).map { case (value, dataType) =>
Cast(Literal(value), dataType).eval()
// `Cast`ed values are always of Catalyst types (i.e. UTF8String instead of String, etc.)
val catalystValue = Cast(Literal(value), dataType).eval()
// Here we're converting Catalyst values to Scala values to test `needsConversion`
CatalystTypeConverters.convertToScala(catalystValue, dataType)
}: _*)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))

// Project many copies of columns with different types (reproduction for SPARK-7858)
checkAnswer(
df.filter('a > 1 && 'p1 < 2).select('b, 'b, 'b, 'b, 'p1, 'p1, 'p1, 'p1),
for (i <- 2 to 3; _ <- Seq("foo", "bar"))
yield Row(s"val_$i", s"val_$i", s"val_$i", s"val_$i", 1, 1, 1, 1))

// Self-join
df.registerTempTable("t")
withTempTable("t") {
Expand Down

0 comments on commit d2f92f5

Please sign in to comment.