Skip to content

Commit

Permalink
Added option to handle incremental collection, disabled by default
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Aug 8, 2014
1 parent 9a54de1 commit 623abde
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private[spark] object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val INCREMENTAL_COLLECT_ENABLED = "spark.sql.thriftServer.incrementalCollect"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down Expand Up @@ -104,6 +105,15 @@ trait SQLConf {
private[spark] def defaultSizeInBytes: Long =
getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong

/**
* When set to true, the Hive Thrift server will collect SQL result set incrementally (one
* partition at a time) to decrease the risk of OOM on driver side. This can be useful when the
* result set is potentially large. The cost is that *the last* stage of the RDD DAG generated
* from the SQL query plan is executed sequentially, and hurts performance.
*/
private[spark] def incrementalCollectEnabled: Boolean =
getConf(INCREMENTAL_COLLECT_ENABLED, "false").toBoolean

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,14 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = result.queryExecution.toRdd.toLocalIterator
iter = {
val resultRdd = result.queryExecution.toRdd
if (hiveContext.incrementalCollectEnabled) {
resultRdd.toLocalIterator
} else {
resultRdd.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
Expand Down

0 comments on commit 623abde

Please sign in to comment.