Skip to content

Commit

Permalink
Add Union unsafe support + tests to bump up test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 18, 2015
1 parent 6f79449 commit 2bb8da8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ case class Sample(
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output: Seq[Attribute] = children.head.output
override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows)
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
protected override def doExecute(): RDD[InternalRow] =
sparkContext.union(children.map(_.execute()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,17 @@ class RowFormatConvertersSuite extends SparkFunSuite {
}
assert(e.getMessage.contains("format"))
}

test("union requires all of its input rows' formats to agree") {
val plan = Union(Seq(outputsSafe, outputsUnsafe))
assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
val preparedPlan = TestSQLContext.prepareForExecution.execute(plan)
assert(!preparedPlan.outputsUnsafeRows)
}

test("union can process unsafe rows") {
val plan = Union(Seq(outputsUnsafe, outputsUnsafe))
val preparedPlan = TestSQLContext.prepareForExecution.execute(plan)
assert(preparedPlan.outputsUnsafeRows)
}
}

0 comments on commit 2bb8da8

Please sign in to comment.