diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index dfffa50cb2aed..debe57883ed33 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -958,9 +958,9 @@ abstract class RDD[T: ClassTag]( * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int): RDD[Seq[T]] = { if (windowSize == 1) { - this.map(Array(_)) + this.map(Seq(_)) } else { new SlidingRDD[T](this, windowSize) } diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index ac0016b18298d..96e3442b878c1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} private[spark] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) extends Partition with Serializable { override val index: Int = idx } @@ -42,16 +42,15 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[ */ private[spark] class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) - extends RDD[Array[T]](parent) { + extends RDD[Seq[T]](parent) { require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.") - override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { + override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = { val part = split.asInstanceOf[SlidingRDDPartition[T]] (firstParent[T].iterator(part.prev, context) ++ part.tail) .sliding(windowSize) - .map(_.toArray) - .filter(_.size == windowSize) + .withPartial(false) } override def getPreferredLocations(split: Partition): Seq[String] = @@ -63,7 +62,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int if (n == 0) { Array.empty } else if (n == 1) { - Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty)) + Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty)) } else { val n1 = n - 1 val w1 = windowSize - 1 @@ -75,7 +74,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int var partitionIndex = 0 while (i < n1) { var j = i - val tail = mutable.ArrayBuffer[T]() + val tail = mutable.ListBuffer[T]() // Keep appending to the current tail until appended a head of size w1. while (j < n1 && nextHeads(j).size < w1) { tail ++= nextHeads(j) @@ -85,14 +84,14 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int tail ++= nextHeads(j) j += 1 } - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq) partitionIndex += 1 // Skip appended heads. i = j } // If the head of last partition has size w1, we also need to add this partition. if (nextHeads(n1 - 1).size == w1) { - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty) } partitions.toArray }