Skip to content

Commit

Permalink
change RDD.sliding return type to RDD[Seq[T]]
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 15, 2014
1 parent 284d991 commit 9916202
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -958,9 +958,9 @@ abstract class RDD[T: ClassTag](
* trigger a Spark job if the parent RDD has more than one partitions and the window size is
* greater than 1.
*/
def sliding(windowSize: Int): RDD[Array[T]] = {
def sliding(windowSize: Int): RDD[Seq[T]] = {
if (windowSize == 1) {
this.map(Array(_))
this.map(Seq(_))
} else {
new SlidingRDD[T](this, windowSize)
}
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{TaskContext, Partition}

private[spark]
class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T])
class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T])
extends Partition with Serializable {
override val index: Int = idx
}
Expand All @@ -42,16 +42,15 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[
*/
private[spark]
class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
extends RDD[Array[T]](parent) {
extends RDD[Seq[T]](parent) {

require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.")

override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = {
override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = {
val part = split.asInstanceOf[SlidingRDDPartition[T]]
(firstParent[T].iterator(part.prev, context) ++ part.tail)
.sliding(windowSize)
.map(_.toArray)
.filter(_.size == windowSize)
.withPartial(false)
}

override def getPreferredLocations(split: Partition): Seq[String] =
Expand All @@ -63,7 +62,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
if (n == 0) {
Array.empty
} else if (n == 1) {
Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty))
Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty))
} else {
val n1 = n - 1
val w1 = windowSize - 1
Expand All @@ -75,7 +74,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
var partitionIndex = 0
while (i < n1) {
var j = i
val tail = mutable.ArrayBuffer[T]()
val tail = mutable.ListBuffer[T]()
// Keep appending to the current tail until appended a head of size w1.
while (j < n1 && nextHeads(j).size < w1) {
tail ++= nextHeads(j)
Expand All @@ -85,14 +84,14 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
tail ++= nextHeads(j)
j += 1
}
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray)
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq)
partitionIndex += 1
// Skip appended heads.
i = j
}
// If the head of last partition has size w1, we also need to add this partition.
if (nextHeads(n1 - 1).size == w1) {
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty)
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty)
}
partitions.toArray
}
Expand Down

0 comments on commit 9916202

Please sign in to comment.