diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 9a338e51268f3..c6c45e7e098c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -98,6 +98,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Concrete implementations of SparkPlan should override doExecute instead. */ final def execute(): RDD[InternalRow] = { + assert(children.map(_.outputsUnsafeRows).distinct.length <= 1, + "Child operators should output rows in the same format") RDDOperationScope.withScope(sparkContext, nodeName, false, true) { doExecute() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 9dfd7c7fa975b..2b10f5654ec9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -28,23 +28,35 @@ class RowFormatConvertersSuite extends SparkFunSuite { case c: ConvertFromUnsafe => c } + private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)) + assert(!outputsSafe.outputsUnsafeRows) + private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)) + assert(outputsUnsafe.outputsUnsafeRows) + test("planner should insert unsafe->safe conversions when required") { - val plan = Limit(10, UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))) + val plan = Limit(10, outputsUnsafe) val preparedPlan = TestSQLContext.prepareForExecution.execute(plan) assert(preparedPlan.children.head.isInstanceOf[ConvertFromUnsafe]) } test("filter can process unsafe rows") { - val plan = Filter(IsNull(null), UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))) - assert(plan.child.outputsUnsafeRows) + val plan = Filter(IsNull(null), outputsUnsafe) val preparedPlan = TestSQLContext.prepareForExecution.execute(plan) assert(getConverters(preparedPlan).isEmpty) + assert(preparedPlan.outputsUnsafeRows) } test("filter can process safe rows") { - val plan = Filter(IsNull(null), ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))) - assert(!plan.child.outputsUnsafeRows) + val plan = Filter(IsNull(null), outputsSafe) val preparedPlan = TestSQLContext.prepareForExecution.execute(plan) assert(getConverters(preparedPlan).isEmpty) + assert(!preparedPlan.outputsUnsafeRows) + } + + test("execute() fails an assertion if inputs rows are of different formats") { + val e = intercept[AssertionError] { + Union(Seq(outputsSafe, outputsUnsafe)).execute() + } + assert(e.getMessage.contains("format")) } }