Skip to content

Commit

Permalink
Merge pull request apache#406 from eklavya/master
Browse files Browse the repository at this point in the history
Extending Java API coverage

Hi,

I have added three new methods to JavaRDD.

Please review and merge.
  • Loading branch information
JoshRosen committed Jan 23, 2014
2 parents a2b47da + 60e7457 commit fad6aac
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,53 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}


/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
}

/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
rdd.foreachPartition((x => f(asJavaIterator(x))))
}

/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
Expand Down Expand Up @@ -461,4 +494,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)

def name(): String = rdd.name

/** Reset generator */
def setGenerator(_generator: String) = {
rdd.setGenerator(_generator)
}
}

0 comments on commit fad6aac

Please sign in to comment.