Skip to content

Commit

Permalink
[SPARK-21041][SQL] SparkSession.range should be consistent with Spark…
Browse files Browse the repository at this point in the history
…Context.range

## What changes were proposed in this pull request?

This PR fixes the inconsistency in `SparkSession.range`.

**BEFORE**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array(9223372036854775804, 9223372036854775805, 9223372036854775806)
```

**AFTER**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array()
```

## How was this patch tested?

Pass the Jenkins with newly added test cases.

Author: Dongjoon Hyun <[email protected]>

Closes #18257 from dongjoon-hyun/SPARK-21041.

(cherry picked from commit a92e095)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed Jun 12, 2017
1 parent a4d78e4 commit e677394
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
Expand Down Expand Up @@ -347,8 +347,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
.map(i => InternalRow(i)) :: Nil
val rdd = if (start == end || (start < end ^ 0 < step)) {
new EmptyRDD[InternalRow](sqlContext.sparkContext)
} else {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
}
rdd :: Nil
}

protected override def doProduce(ctx: CodegenContext): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil)
}
}

test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") {
val start = java.lang.Long.MAX_VALUE - 3
val end = java.lang.Long.MIN_VALUE + 2
Seq("false", "true").foreach { value =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) {
assert(spark.range(start, end, 1).collect.length == 0)
assert(spark.range(start, start, 1).collect.length == 0)
}
}
}
}

object DataFrameRangeSuite {
Expand Down

0 comments on commit e677394

Please sign in to comment.