Skip to content

Commit

Permalink
[SPARK-6972][SQL] Add Coalesce to DataFrame
Browse files Browse the repository at this point in the history
Author: Michael Armbrust <[email protected]>

Closes #5545 from marmbrus/addCoalesce and squashes the following commits:

9fdf3f6 [Michael Armbrust] [SPARK-6972][SQL] Add Coalesce to DataFrame
  • Loading branch information
marmbrus authored and rxin committed Apr 17, 2015
1 parent e5949c2 commit 8220d52
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
14 changes: 14 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,20 @@ class DataFrame private[sql](
schema, needsConversion = false)
}

/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* @group rdd
*/
override def coalesce(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.coalesce(numPartitions),
schema,
needsConversion = false)
}

/**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
* @group dfops
Expand Down
2 changes: 2 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] {

def repartition(numPartitions: Int): DataFrame

def coalesce(numPartitions: Int): DataFrame

def distinct: DataFrame
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest {
testData.select('key).collect().toSeq)
}

test("coalesce") {
assert(testData.select('key).coalesce(1).rdd.partitions.size === 1)

checkAnswer(
testData.select('key).coalesce(1).select('key),
testData.select('key).collect().toSeq)
}

test("groupBy") {
checkAnswer(
testData2.groupBy("a").agg($"a", sum($"b")),
Expand Down

0 comments on commit 8220d52

Please sign in to comment.