Skip to content

Commit

Permalink
change SlidedRDD to SlidingRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 15, 2014
1 parent 5ee6001 commit 284d991
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ abstract class RDD[T: ClassTag](
if (windowSize == 1) {
this.map(Array(_))
} else {
new SlidedRDD[T](this, windowSize)
new SlidingRDD[T](this, windowSize)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{TaskContext, Partition}

private[spark]
class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T])
class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T])
extends Partition with Serializable {
override val index: Int = idx
}
Expand All @@ -41,36 +41,36 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T
* @see [[org.apache.spark.rdd.RDD#sliding]]
*/
private[spark]
class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
extends RDD[Array[T]](parent) {

require(windowSize > 1, "Window size must be greater than 1.")
require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.")

override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = {
val part = split.asInstanceOf[SlidedRDDPartition[T]]
val part = split.asInstanceOf[SlidingRDDPartition[T]]
(firstParent[T].iterator(part.prev, context) ++ part.tail)
.sliding(windowSize)
.map(_.toArray)
.filter(_.size == windowSize)
}

override def getPreferredLocations(split: Partition): Seq[String] =
firstParent[T].preferredLocations(split.asInstanceOf[SlidedRDDPartition[T]].prev)
firstParent[T].preferredLocations(split.asInstanceOf[SlidingRDDPartition[T]].prev)

override def getPartitions: Array[Partition] = {
val parentPartitions = parent.partitions
val n = parentPartitions.size
if (n == 0) {
Array.empty
} else if (n == 1) {
Array(new SlidedRDDPartition[T](0, parentPartitions(0), Array.empty))
Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty))
} else {
val n1 = n - 1
val w1 = windowSize - 1
// Get the first w1 items of each partition, starting from the second partition.
val nextHeads =
parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true)
val partitions = mutable.ArrayBuffer[SlidedRDDPartition[T]]()
val partitions = mutable.ArrayBuffer[SlidingRDDPartition[T]]()
var i = 0
var partitionIndex = 0
while (i < n1) {
Expand All @@ -85,14 +85,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
tail ++= nextHeads(j)
j += 1
}
partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray)
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray)
partitionIndex += 1
// Skip appended heads.
i = j
}
// If the head of last partition has size w1, we also need to add this partition.
if (nextHeads(n1 - 1).size == w1) {
partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty)
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty)
}
partitions.toArray
}
Expand Down

0 comments on commit 284d991

Please sign in to comment.