diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index debe57883ed33..4afa7523dd802 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -950,22 +950,6 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) - /** - * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding - * window over them. The ordering is first based on the partition index and then the ordering of - * items within each partition. This is similar to sliding in Scala collections, except that it - * becomes an empty RDD if the window size is greater than the total number of items. It needs to - * trigger a Spark job if the parent RDD has more than one partitions and the window size is - * greater than 1. - */ - def sliding(windowSize: Int): RDD[Seq[T]] = { - if (windowSize == 1) { - this.map(Seq(_)) - } else { - new SlidingRDD[T](this, windowSize) - } - } - /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index a5962406b2e1a..60bcada55245b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -553,18 +553,4 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ids = ranked.map(_._1).distinct().collect() assert(ids.length === n) } - - test("sliding") { - val data = 0 until 6 - for (numPartitions <- 1 to 8) { - val rdd = sc.parallelize(data, numPartitions) - for (windowSize <- 1 to 6) { - val slided = rdd.sliding(windowSize).collect().map(_.toList).toList - val expected = data.sliding(windowSize).map(_.toList).toList - assert(slided === expected) - } - assert(rdd.sliding(7).collect().isEmpty, - "Should return an empty RDD if the window size is greater than the number of items.") - } - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala new file mode 100644 index 0000000000000..873de871fd884 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD + +/** + * Machine learning specific RDD functions. + */ +private[mllib] +class RDDFunctions[T: ClassTag](self: RDD[T]) { + + /** + * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * window over them. The ordering is first based on the partition index and then the ordering of + * items within each partition. This is similar to sliding in Scala collections, except that it + * becomes an empty RDD if the window size is greater than the total number of items. It needs to + * trigger a Spark job if the parent RDD has more than one partitions and the window size is + * greater than 1. + */ + def sliding(windowSize: Int): RDD[Seq[T]] = { + require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.") + if (windowSize == 1) { + self.map(Seq(_)) + } else { + new SlidingRDD[T](self, windowSize) + } + } +} + +private[mllib] +object RDDFunctions { + + /** Implicit conversion from an RDD to RDDFunctions. */ + implicit def fromRDD[T: ClassTag](rdd: RDD[T]) = new RDDFunctions[T](rdd) +} diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala similarity index 92% rename from core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala rename to mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index df87bc5459699..dd80782c0f001 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.spark.rdd +package org.apache.spark.mllib.rdd import scala.collection.mutable import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.rdd.RDD -private[spark] +private[mllib] class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) extends Partition with Serializable { override val index: Int = idx @@ -33,14 +34,16 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T] * window over them. The ordering is first based on the partition index and then the ordering of * items within each partition. This is similar to sliding in Scala collections, except that it * becomes an empty RDD if the window size is greater than the total number of items. It needs to - * trigger a Spark job if the parent RDD has more than one partitions. + * trigger a Spark job if the parent RDD has more than one partitions. To make this operation + * efficient, the number of items per partition should be larger than the window size and the + * window size should be small, e.g., 2. * * @param parent the parent RDD * @param windowSize the window size, must be greater than 1 * - * @see [[org.apache.spark.rdd.RDD#sliding]] + * @see [[org.apache.spark.mllib.rdd.RDDFunctions#sliding]] */ -private[spark] +private[mllib] class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) extends RDD[Seq[T]](parent) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala new file mode 100644 index 0000000000000..a3245d731a15b --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.rdd.RDDFunctions._ + +class RDDFunctionsSuite extends FunSuite with LocalSparkContext { + + test("sliding") { + val data = 0 until 6 + for (numPartitions <- 1 to 8) { + val rdd = sc.parallelize(data, numPartitions) + for (windowSize <- 1 to 6) { + val slided = rdd.sliding(windowSize).collect().map(_.toList).toList + val expected = data.sliding(windowSize).map(_.toList).toList + assert(slided === expected) + } + assert(rdd.sliding(7).collect().isEmpty, + "Should return an empty RDD if the window size is greater than the number of items.") + } + } +}