Skip to content

Commit

Permalink
Merge branch 'sliding' into auc
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 31, 2014
2 parents c1c6c22 + a9b250a commit a920865
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 50 deletions.
16 changes: 0 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -951,22 +951,6 @@ abstract class RDD[T: ClassTag](
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)

/**
* Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding
* window over them. The ordering is first based on the partition index and then the ordering of
* items within each partition. This is similar to sliding in Scala collections, except that it
* becomes an empty RDD if the window size is greater than the total number of items. It needs to
* trigger a Spark job if the parent RDD has more than one partitions and the window size is
* greater than 1.
*/
def sliding(windowSize: Int): RDD[Array[T]] = {
if (windowSize == 1) {
this.map(Array(_))
} else {
new SlidedRDD[T](this, windowSize)
}
}

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
14 changes: 0 additions & 14 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,4 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val ids = ranked.map(_._1).distinct().collect()
assert(ids.length === n)
}

test("sliding") {
val data = 0 until 6
for (numPartitions <- 1 to 8) {
val rdd = sc.parallelize(data, numPartitions)
for (windowSize <- 1 to 6) {
val slided = rdd.sliding(windowSize).collect().map(_.toList).toList
val expected = data.sliding(windowSize).map(_.toList).toList
assert(slided === expected)
}
assert(rdd.sliding(7).collect().isEmpty,
"Should return an empty RDD if the window size is greater than the number of items.")
}
}
}
53 changes: 53 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.rdd

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD

/**
* Machine learning specific RDD functions.
*/
private[mllib]
class RDDFunctions[T: ClassTag](self: RDD[T]) {

/**
* Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding
* window over them. The ordering is first based on the partition index and then the ordering of
* items within each partition. This is similar to sliding in Scala collections, except that it
* becomes an empty RDD if the window size is greater than the total number of items. It needs to
* trigger a Spark job if the parent RDD has more than one partitions and the window size is
* greater than 1.
*/
def sliding(windowSize: Int): RDD[Seq[T]] = {
require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.")
if (windowSize == 1) {
self.map(Seq(_))
} else {
new SlidingRDD[T](self, windowSize)
}
}
}

private[mllib]
object RDDFunctions {

/** Implicit conversion from an RDD to RDDFunctions. */
implicit def fromRDD[T: ClassTag](rdd: RDD[T]) = new RDDFunctions[T](rdd)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
* limitations under the License.
*/

package org.apache.spark.rdd
package org.apache.spark.mllib.rdd

import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.spark.{TaskContext, Partition}
import org.apache.spark.rdd.RDD

private[spark]
class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T])
private[mllib]
class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T])
extends Partition with Serializable {
override val index: Int = idx
}
Expand All @@ -33,49 +34,50 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T
* window over them. The ordering is first based on the partition index and then the ordering of
* items within each partition. This is similar to sliding in Scala collections, except that it
* becomes an empty RDD if the window size is greater than the total number of items. It needs to
* trigger a Spark job if the parent RDD has more than one partitions.
* trigger a Spark job if the parent RDD has more than one partitions. To make this operation
* efficient, the number of items per partition should be larger than the window size and the
* window size should be small, e.g., 2.
*
* @param parent the parent RDD
* @param windowSize the window size, must be greater than 1
*
* @see [[org.apache.spark.rdd.RDD#sliding]]
* @see [[org.apache.spark.mllib.rdd.RDDFunctions#sliding]]
*/
private[spark]
class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
extends RDD[Array[T]](parent) {
private[mllib]
class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
extends RDD[Seq[T]](parent) {

require(windowSize > 1, "Window size must be greater than 1.")
require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.")

override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = {
val part = split.asInstanceOf[SlidedRDDPartition[T]]
override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = {
val part = split.asInstanceOf[SlidingRDDPartition[T]]
(firstParent[T].iterator(part.prev, context) ++ part.tail)
.sliding(windowSize)
.map(_.toArray)
.filter(_.size == windowSize)
.withPartial(false)
}

override def getPreferredLocations(split: Partition): Seq[String] =
firstParent[T].preferredLocations(split.asInstanceOf[SlidedRDDPartition[T]].prev)
firstParent[T].preferredLocations(split.asInstanceOf[SlidingRDDPartition[T]].prev)

override def getPartitions: Array[Partition] = {
val parentPartitions = parent.partitions
val n = parentPartitions.size
if (n == 0) {
Array.empty
} else if (n == 1) {
Array(new SlidedRDDPartition[T](0, parentPartitions(0), Array.empty))
Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty))
} else {
val n1 = n - 1
val w1 = windowSize - 1
// Get the first w1 items of each partition, starting from the second partition.
val nextHeads =
parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true)
val partitions = mutable.ArrayBuffer[SlidedRDDPartition[T]]()
val partitions = mutable.ArrayBuffer[SlidingRDDPartition[T]]()
var i = 0
var partitionIndex = 0
while (i < n1) {
var j = i
val tail = mutable.ArrayBuffer[T]()
val tail = mutable.ListBuffer[T]()
// Keep appending to the current tail until appended a head of size w1.
while (j < n1 && nextHeads(j).size < w1) {
tail ++= nextHeads(j)
Expand All @@ -85,14 +87,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
tail ++= nextHeads(j)
j += 1
}
partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray)
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail)
partitionIndex += 1
// Skip appended heads.
i = j
}
// If the head of last partition has size w1, we also need to add this partition.
if (nextHeads(n1 - 1).size == w1) {
partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty)
if (nextHeads.last.size == w1) {
partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty)
}
partitions.toArray
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.rdd

import org.scalatest.FunSuite

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.rdd.RDDFunctions._

class RDDFunctionsSuite extends FunSuite with LocalSparkContext {

test("sliding") {
val data = 0 until 6
for (numPartitions <- 1 to 8) {
val rdd = sc.parallelize(data, numPartitions)
for (windowSize <- 1 to 6) {
val slided = rdd.sliding(windowSize).collect().map(_.toList).toList
val expected = data.sliding(windowSize).map(_.toList).toList
assert(slided === expected)
}
assert(rdd.sliding(7).collect().isEmpty,
"Should return an empty RDD if the window size is greater than the number of items.")
}
}
}

0 comments on commit a920865

Please sign in to comment.