Skip to content

Commit

Permalink
[SPARK-2911]: provide rdd.parent[T](j) to obtain jth parent RDD
Browse files Browse the repository at this point in the history
Author: Erik Erlandson <[email protected]>

Closes apache#1841 from erikerlandson/spark-2911-pr and squashes the following commits:

4699e2f [Erik Erlandson] [SPARK-2911]: provide rdd.parent[T](j) to obtain jth parent RDD
  • Loading branch information
erikerlandson authored and conviva-zz committed Sep 4, 2014
1 parent 7e74492 commit ffcab1e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag](
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

/** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
protected[spark] def parent[U: ClassTag](j: Int) = {
dependencies(j).rdd.asInstanceOf[RDD[U]]
}

/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
def context = sc

Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,16 @@ class RDDSuite extends FunSuite with SharedSparkContext {
jrdd.rdd.retag.collect()
}

test("parent method") {
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.filter(_ % 2 == 0)
val rdd3 = rdd2.map(_ + 1)
val rdd4 = new UnionRDD(sc, List(rdd1, rdd2, rdd3))
assert(rdd4.parent(0).isInstanceOf[ParallelCollectionRDD[_]])
assert(rdd4.parent(1).isInstanceOf[FilteredRDD[_]])
assert(rdd4.parent(2).isInstanceOf[MappedRDD[_, _]])
}

test("getNarrowAncestors") {
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
Expand Down

0 comments on commit ffcab1e

Please sign in to comment.