Skip to content

Commit

Permalink
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existi…
Browse files Browse the repository at this point in the history
…ng uses to JavaConverters

Replace `JavaConversions` implicits with `JavaConverters`

Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.

Author: Sean Owen <[email protected]>

Closes #8033 from srowen/SPARK-9613.
  • Loading branch information
srowen committed Aug 25, 2015
1 parent 7f1e507 commit 69c9c17
Show file tree
Hide file tree
Showing 171 changed files with 863 additions and 880 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import scala.Option;
import scala.Product2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
Expand Down Expand Up @@ -160,7 +160,7 @@ public long getPeakMemoryUsedBytes() {
*/
@VisibleForTesting
public void write(Iterator<Product2<K, V>> records) throws IOException {
write(JavaConversions.asScalaIterator(records));
write(JavaConverters.asScalaIteratorConverter(records).asScala());
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, RpcEndpoint}
Expand Down Expand Up @@ -398,7 +398,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
}

private[spark] object MapOutputTracker extends Logging {
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark

import java.io.{File, FileInputStream}
import java.security.{KeyStore, NoSuchAlgorithmException}
import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, TrustManagerFactory}
import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext

import scala.collection.JavaConverters._

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory
Expand Down Expand Up @@ -79,7 +81,6 @@ private[spark] case class SSLOptions(
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
import scala.collection.JavaConversions._
if (enabled) {
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
Expand All @@ -97,7 +98,7 @@ private[spark] case class SSLOptions(
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq))
ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID

import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -1546,7 +1546,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def getAllPools: Seq[Schedulable] = {
assertNotStopped()
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
taskScheduler.rootPool.schedulableQueue.asScala.toSeq
}

/**
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.nio.charset.StandardCharsets
import java.util.Arrays
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

Expand Down Expand Up @@ -71,7 +72,7 @@ private[spark] object TestUtils {
files.foreach { case (k, v) =>
val entry = new JarEntry(k)
jarStream.putNextEntry(entry)
ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
ByteStreams.copy(new ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)), jarStream)
}
jarStream.close()
jarFile.toURI.toURL
Expand Down Expand Up @@ -125,7 +126,7 @@ private[spark] object TestUtils {
} else {
Seq()
}
compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()
compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call()

val fileName = className + ".class"
val result = new File(fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.InputSplit
Expand All @@ -37,7 +37,7 @@ class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
def mapPartitionsWithInputSplit[R](
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.hadoop.mapreduce.InputSplit
Expand All @@ -37,7 +37,7 @@ class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
def mapPartitionsWithInputSplit[R](
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
}
}
19 changes: 8 additions & 11 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.api.java
import java.util.{Comparator, List => JList, Map => JMap}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag

Expand Down Expand Up @@ -142,7 +142,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
Expand Down Expand Up @@ -173,7 +173,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sampleByKeyExact(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, seed))
new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed))

/**
* ::Experimental::
Expand Down Expand Up @@ -768,7 +768,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
def lookup(key: K): JList[V] = rdd.lookup(key).asJava

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
Expand Down Expand Up @@ -987,30 +987,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
object JavaPairRDD {
private[spark]
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
rddToPairRDDFunctions(rdd).mapValues(_.asJava)
}

private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava))
}

private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava))
}

private[spark]
def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x =>
(asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava, x._4.asJava))
}

def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
Expand Down
Loading

0 comments on commit 69c9c17

Please sign in to comment.