diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 64e90be4d460f..5b2e46962cd3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -96,11 +96,18 @@ case class Exchange( val cannotUseSqlSerializer2 = (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) || newOrdering.nonEmpty + // It is true when there is no field that needs to be write out. + // For now, we will not use SparkSqlSerializer2 when noField is true. + val noField = + (keySchema == null || keySchema.length == 0) && + (valueSchema == null || valueSchema.length == 0) + val useSqlSerializer2 = - child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled. - !cannotUseSqlSerializer2 && // Safe to use Serializer2. - SparkSqlSerializer2.support(keySchema) && // The schema of key is supported. - SparkSqlSerializer2.support(valueSchema) // The schema of value is supported. + child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled. + !cannotUseSqlSerializer2 && // Safe to use Serializer2. + SparkSqlSerializer2.support(keySchema) && // The schema of key is supported. + SparkSqlSerializer2.support(valueSchema) && // The schema of value is supported. + !noField val serializer = if (useSqlSerializer2) { logInfo("Using SparkSqlSerializer2.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala index 3bb78a8ccc9d0..27f063d73a9a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala @@ -155,6 +155,11 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll df.map(r => r.getString(0)).collect().toSeq === table("shuffle").select("col0").map(r => r.getString(0)).collect().sorted.toSeq) } + + test("no map output field") { + val df = sql(s"SELECT 1 + 1 FROM shuffle") + checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer]) + } } /** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */