Skip to content

Commit

Permalink
[SPARK-1958] Calling .collect() on a SchemaRDD should call executeCol…
Browse files Browse the repository at this point in the history
…lect() on the underlying query plan.

In cases like `Limit` and `TakeOrdered`, `executeCollect()` makes optimizations that `execute().collect()` will not.

Author: Cheng Lian <[email protected]>

Closes #939 from liancheng/spark-1958 and squashes the following commits:

bdc4a14 [Cheng Lian] Copy rows to present immutable data to users
8250976 [Cheng Lian] Added return type explicitly for public API
192a25c [Cheng Lian] [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.
  • Loading branch information
liancheng authored and rxin committed Jun 2, 2014
1 parent 9a5d482 commit d000ca9
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ class SchemaRDD(
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
}

// =======================================================================
// Overriden RDD actions
// =======================================================================

override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()

// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[Row] = execute().collect()
def executeCollect(): Array[Row] = execute().map(_.copy()).collect()

protected def buildRow(values: Seq[Any]): Row =
new GenericRow(values.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
}

test("insert (appending) to same table via Scala API") {
sql("INSERT INTO testsource SELECT * FROM testsource").collect()
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
assert(double_rdd != null)
assert(double_rdd.size === 30)
Expand Down

0 comments on commit d000ca9

Please sign in to comment.