Skip to content

Commit

Permalink
When no filed is emitted to shuffle, use SparkSqlSerializer for now.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 18, 2015
1 parent 9f1ed92 commit 50e0c3d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,18 @@ case class Exchange(
val cannotUseSqlSerializer2 =
(sortBasedShuffleOn && numPartitions > bypassMergeThreshold) || newOrdering.nonEmpty

// It is true when there is no field that needs to be write out.
// For now, we will not use SparkSqlSerializer2 when noField is true.
val noField =
(keySchema == null || keySchema.length == 0) &&
(valueSchema == null || valueSchema.length == 0)

val useSqlSerializer2 =
child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled.
!cannotUseSqlSerializer2 && // Safe to use Serializer2.
SparkSqlSerializer2.support(keySchema) && // The schema of key is supported.
SparkSqlSerializer2.support(valueSchema) // The schema of value is supported.
child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled.
!cannotUseSqlSerializer2 && // Safe to use Serializer2.
SparkSqlSerializer2.support(keySchema) && // The schema of key is supported.
SparkSqlSerializer2.support(valueSchema) && // The schema of value is supported.
!noField

val serializer = if (useSqlSerializer2) {
logInfo("Using SparkSqlSerializer2.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
df.map(r => r.getString(0)).collect().toSeq ===
table("shuffle").select("col0").map(r => r.getString(0)).collect().sorted.toSeq)
}

test("no map output field") {
val df = sql(s"SELECT 1 + 1 FROM shuffle")
checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
}
}

/** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */
Expand Down

0 comments on commit 50e0c3d

Please sign in to comment.