Skip to content

Commit

Permalink
Add assertion if operators' input rows are in different formats
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 18, 2015
1 parent cabb703 commit 0e2d548
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override doExecute instead.
*/
final def execute(): RDD[InternalRow] = {
assert(children.map(_.outputsUnsafeRows).distinct.length <= 1,
"Child operators should output rows in the same format")
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
doExecute()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,35 @@ class RowFormatConvertersSuite extends SparkFunSuite {
case c: ConvertFromUnsafe => c
}

private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(!outputsSafe.outputsUnsafeRows)
private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(outputsUnsafe.outputsUnsafeRows)

test("planner should insert unsafe->safe conversions when required") {
val plan = Limit(10, UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)))
val plan = Limit(10, outputsUnsafe)
val preparedPlan = TestSQLContext.prepareForExecution.execute(plan)
assert(preparedPlan.children.head.isInstanceOf[ConvertFromUnsafe])
}

test("filter can process unsafe rows") {
val plan = Filter(IsNull(null), UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)))
assert(plan.child.outputsUnsafeRows)
val plan = Filter(IsNull(null), outputsUnsafe)
val preparedPlan = TestSQLContext.prepareForExecution.execute(plan)
assert(getConverters(preparedPlan).isEmpty)
assert(preparedPlan.outputsUnsafeRows)
}

test("filter can process safe rows") {
val plan = Filter(IsNull(null), ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)))
assert(!plan.child.outputsUnsafeRows)
val plan = Filter(IsNull(null), outputsSafe)
val preparedPlan = TestSQLContext.prepareForExecution.execute(plan)
assert(getConverters(preparedPlan).isEmpty)
assert(!preparedPlan.outputsUnsafeRows)
}

test("execute() fails an assertion if inputs rows are of different formats") {
val e = intercept[AssertionError] {
Union(Seq(outputsSafe, outputsUnsafe)).execute()
}
assert(e.getMessage.contains("format"))
}
}

0 comments on commit 0e2d548

Please sign in to comment.